mirror of https://github.com/apache/druid.git
Improve code quality and unit test coverage of the Azure extension (#9265)
* IMPLY-1946: Improve code quality and unit test coverage of the Azure extension * Update unit tests to increase test coverage for the extension * Clean up any messy code * Enfore code coverage as part of tests. * * Update azure extension pom to remove unnecessary things * update jacoco thresholds * * updgrade version of azure-storage library version uses to most upto-date version * * exclude common libraries that are included from druid core * * address review comments
This commit is contained in:
parent
b411443d22
commit
a085685182
|
@ -45,7 +45,7 @@
|
|||
<dependency>
|
||||
<groupId>com.microsoft.azure</groupId>
|
||||
<artifactId>azure-storage</artifactId>
|
||||
<version>2.1.0</version>
|
||||
<version>8.6.0</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
|
@ -59,12 +59,17 @@
|
|||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.module</groupId>
|
||||
<artifactId>jackson-module-guice</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-io</groupId>
|
||||
|
@ -84,6 +89,7 @@
|
|||
<dependency>
|
||||
<groupId>com.google.inject</groupId>
|
||||
<artifactId>guice</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
|
@ -103,6 +109,7 @@
|
|||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>javax.validation</groupId>
|
||||
|
@ -135,4 +142,66 @@
|
|||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.jacoco</groupId>
|
||||
<artifactId>jacoco-maven-plugin</artifactId>
|
||||
<version>0.8.4</version>
|
||||
<configuration>
|
||||
<excludes>
|
||||
<!-- Ignore firehose code -->
|
||||
<exclude>org/apache/druid/firehose/azure/**/*</exclude> <!-- integration-tests -->
|
||||
</excludes>
|
||||
<rules>
|
||||
<rule>
|
||||
<element>BUNDLE</element>
|
||||
<limits>
|
||||
<limit>
|
||||
<counter>INSTRUCTION</counter>
|
||||
<value>COVEREDRATIO</value>
|
||||
<minimum>0.79</minimum>
|
||||
</limit>
|
||||
<limit>
|
||||
<counter>LINE</counter>
|
||||
<value>COVEREDRATIO</value>
|
||||
<minimum>0.80</minimum>
|
||||
</limit>
|
||||
<limit>
|
||||
<counter>BRANCH</counter>
|
||||
<value>COVEREDRATIO</value>
|
||||
<minimum>0.70</minimum>
|
||||
</limit>
|
||||
<limit>
|
||||
<counter>COMPLEXITY</counter>
|
||||
<value>COVEREDRATIO</value>
|
||||
<minimum>0.73</minimum>
|
||||
</limit>
|
||||
<limit>
|
||||
<counter>METHOD</counter>
|
||||
<value>COVEREDRATIO</value>
|
||||
<minimum>0.76</minimum>
|
||||
</limit>
|
||||
<limit>
|
||||
<counter>CLASS</counter>
|
||||
<value>COVEREDRATIO</value>
|
||||
<minimum>0.83</minimum>
|
||||
</limit>
|
||||
</limits>
|
||||
</rule>
|
||||
</rules>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>report</id>
|
||||
<phase>test</phase>
|
||||
<goals>
|
||||
<goal>report</goal>
|
||||
<goal>check</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.apache.druid.storage.azure;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.inject.Inject;
|
||||
|
@ -40,8 +41,8 @@ import java.util.Map;
|
|||
|
||||
public class AzureDataSegmentPusher implements DataSegmentPusher
|
||||
{
|
||||
|
||||
private static final Logger log = new Logger(AzureDataSegmentPusher.class);
|
||||
static final List<String> ALLOWED_PROPERTY_PREFIXES_FOR_HADOOP = ImmutableList.of("druid.azure");
|
||||
private final AzureStorage azureStorage;
|
||||
private final AzureAccountConfig config;
|
||||
|
||||
|
@ -103,37 +104,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
|
|||
@Override
|
||||
public List<String> getAllowedPropertyPrefixesForHadoop()
|
||||
{
|
||||
return ImmutableList.of("druid.azure");
|
||||
}
|
||||
|
||||
public String getAzurePath(final DataSegment segment, final boolean useUniquePath)
|
||||
{
|
||||
final String storageDir = this.getStorageDir(segment, useUniquePath);
|
||||
|
||||
return StringUtils.format("%s/%s", storageDir, AzureStorageDruidModule.INDEX_ZIP_FILE_NAME);
|
||||
|
||||
}
|
||||
|
||||
public DataSegment uploadDataSegment(
|
||||
DataSegment segment,
|
||||
final int binaryVersion,
|
||||
final long size,
|
||||
final File compressedSegmentData,
|
||||
final String azurePath
|
||||
)
|
||||
throws StorageException, IOException, URISyntaxException
|
||||
{
|
||||
azureStorage.uploadBlob(compressedSegmentData, config.getContainer(), azurePath);
|
||||
|
||||
final DataSegment outSegment = segment
|
||||
.withSize(size)
|
||||
.withLoadSpec(this.makeLoadSpec(new URI(azurePath)))
|
||||
.withBinaryVersion(binaryVersion);
|
||||
|
||||
log.info("Deleting file [%s]", compressedSegmentData);
|
||||
compressedSegmentData.delete();
|
||||
|
||||
return outSegment;
|
||||
return ALLOWED_PROPERTY_PREFIXES_FOR_HADOOP;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -179,4 +150,36 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
|
|||
uri.toString()
|
||||
);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
String getAzurePath(final DataSegment segment, final boolean useUniquePath)
|
||||
{
|
||||
final String storageDir = this.getStorageDir(segment, useUniquePath);
|
||||
|
||||
return StringUtils.format("%s/%s", storageDir, AzureStorageDruidModule.INDEX_ZIP_FILE_NAME);
|
||||
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
DataSegment uploadDataSegment(
|
||||
DataSegment segment,
|
||||
final int binaryVersion,
|
||||
final long size,
|
||||
final File compressedSegmentData,
|
||||
final String azurePath
|
||||
)
|
||||
throws StorageException, IOException, URISyntaxException
|
||||
{
|
||||
azureStorage.uploadBlob(compressedSegmentData, config.getContainer(), azurePath);
|
||||
|
||||
final DataSegment outSegment = segment
|
||||
.withSize(size)
|
||||
.withLoadSpec(this.makeLoadSpec(new URI(azurePath)))
|
||||
.withBinaryVersion(binaryVersion);
|
||||
|
||||
log.debug("Deleting file [%s]", compressedSegmentData);
|
||||
compressedSegmentData.delete();
|
||||
|
||||
return outSegment;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.apache.druid.storage.azure;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import com.microsoft.azure.storage.blob.CloudBlob;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobClient;
|
||||
|
@ -48,20 +49,11 @@ public class AzureStorage
|
|||
this.cloudBlobClient = cloudBlobClient;
|
||||
}
|
||||
|
||||
public CloudBlobContainer getCloudBlobContainer(final String containerName)
|
||||
throws StorageException, URISyntaxException
|
||||
{
|
||||
CloudBlobContainer cloudBlobContainer = cloudBlobClient.getContainerReference(containerName);
|
||||
cloudBlobContainer.createIfNotExists();
|
||||
|
||||
return cloudBlobContainer;
|
||||
}
|
||||
|
||||
public List<String> emptyCloudBlobDirectory(final String containerName, final String virtualDirPath)
|
||||
throws StorageException, URISyntaxException
|
||||
{
|
||||
List<String> deletedFiles = new ArrayList<>();
|
||||
CloudBlobContainer container = getCloudBlobContainer(containerName);
|
||||
CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName);
|
||||
|
||||
for (ListBlobItem blobItem : container.listBlobs(virtualDirPath, true, null, null, null)) {
|
||||
CloudBlob cloudBlob = (CloudBlob) blobItem;
|
||||
|
@ -81,7 +73,7 @@ public class AzureStorage
|
|||
public void uploadBlob(final File file, final String containerName, final String blobPath)
|
||||
throws IOException, StorageException, URISyntaxException
|
||||
{
|
||||
CloudBlobContainer container = getCloudBlobContainer(containerName);
|
||||
CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName);
|
||||
try (FileInputStream stream = new FileInputStream(file)) {
|
||||
container.getBlockBlobReference(blobPath).upload(stream, file.length());
|
||||
}
|
||||
|
@ -90,18 +82,39 @@ public class AzureStorage
|
|||
public long getBlobLength(final String containerName, final String blobPath)
|
||||
throws URISyntaxException, StorageException
|
||||
{
|
||||
return getCloudBlobContainer(containerName).getBlockBlobReference(blobPath).getProperties().getLength();
|
||||
return getOrCreateCloudBlobContainer(containerName).getBlockBlobReference(blobPath).getProperties().getLength();
|
||||
}
|
||||
|
||||
public InputStream getBlobInputStream(final String containerName, final String blobPath)
|
||||
throws URISyntaxException, StorageException
|
||||
{
|
||||
CloudBlobContainer container = getCloudBlobContainer(containerName);
|
||||
return container.getBlockBlobReference(blobPath).openInputStream();
|
||||
return getBlobInputStream(0L, containerName, blobPath);
|
||||
}
|
||||
|
||||
public InputStream getBlobInputStream(long offset, final String containerName, final String blobPath)
|
||||
throws URISyntaxException, StorageException
|
||||
{
|
||||
CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName);
|
||||
return container.getBlockBlobReference(blobPath).openInputStream(offset, null, null, null, null);
|
||||
}
|
||||
|
||||
public boolean getBlobExists(String container, String blobPath) throws URISyntaxException, StorageException
|
||||
{
|
||||
return getCloudBlobContainer(container).getBlockBlobReference(blobPath).exists();
|
||||
return getOrCreateCloudBlobContainer(container).getBlockBlobReference(blobPath).exists();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
CloudBlobClient getCloudBlobClient()
|
||||
{
|
||||
return this.cloudBlobClient;
|
||||
}
|
||||
|
||||
private CloudBlobContainer getOrCreateCloudBlobContainer(final String containerName)
|
||||
throws StorageException, URISyntaxException
|
||||
{
|
||||
CloudBlobContainer cloudBlobContainer = cloudBlobClient.getContainerReference(containerName);
|
||||
cloudBlobContainer.createIfNotExists();
|
||||
|
||||
return cloudBlobContainer;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,7 +42,7 @@ import java.util.List;
|
|||
public class AzureStorageDruidModule implements DruidModule
|
||||
{
|
||||
|
||||
public static final String SCHEME = "azure";
|
||||
static final String SCHEME = "azure";
|
||||
public static final String STORAGE_CONNECTION_STRING = "DefaultEndpointsProtocol=%s;AccountName=%s;AccountKey=%s";
|
||||
public static final String INDEX_ZIP_FILE_NAME = "index.zip";
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.druid.storage.azure;
|
|||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import com.microsoft.azure.storage.StorageExtendedErrorInformation;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.segment.loading.SegmentLoadingException;
|
||||
import org.apache.druid.timeline.DataSegment;
|
||||
|
@ -52,6 +53,9 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
|
|||
1
|
||||
);
|
||||
|
||||
private static final StorageExtendedErrorInformation NULL_STORAGE_EXTENDED_ERROR_INFORMATION = null;
|
||||
private static final StorageExtendedErrorInformation STORAGE_EXTENDED_ERROR_INFORMATION = new StorageExtendedErrorInformation();
|
||||
|
||||
private AzureStorage azureStorage;
|
||||
|
||||
@Before
|
||||
|
@ -79,9 +83,54 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
|
|||
}
|
||||
|
||||
@Test(expected = SegmentLoadingException.class)
|
||||
public void killWithErrorTest() throws SegmentLoadingException, URISyntaxException, StorageException
|
||||
public void test_kill_StorageExceptionExtendedErrorInformationNull_throwsException()
|
||||
throws SegmentLoadingException, URISyntaxException, StorageException
|
||||
{
|
||||
|
||||
common_test_kill_StorageExceptionExtendedError_throwsException(NULL_STORAGE_EXTENDED_ERROR_INFORMATION);
|
||||
}
|
||||
|
||||
@Test(expected = SegmentLoadingException.class)
|
||||
public void test_kill_StorageExceptionExtendedErrorInformationNotNull_throwsException()
|
||||
throws SegmentLoadingException, URISyntaxException, StorageException
|
||||
{
|
||||
|
||||
common_test_kill_StorageExceptionExtendedError_throwsException(STORAGE_EXTENDED_ERROR_INFORMATION);
|
||||
}
|
||||
|
||||
@Test(expected = SegmentLoadingException.class)
|
||||
public void test_kill_URISyntaxException_throwsException()
|
||||
throws SegmentLoadingException, URISyntaxException, StorageException
|
||||
{
|
||||
|
||||
String dirPath = Paths.get(BLOB_PATH).getParent().toString();
|
||||
|
||||
EasyMock.expect(azureStorage.emptyCloudBlobDirectory(CONTAINER_NAME, dirPath)).andThrow(
|
||||
new URISyntaxException(
|
||||
"",
|
||||
""
|
||||
)
|
||||
);
|
||||
|
||||
replayAll();
|
||||
|
||||
AzureDataSegmentKiller killer = new AzureDataSegmentKiller(azureStorage);
|
||||
|
||||
killer.kill(DATA_SEGMENT);
|
||||
|
||||
verifyAll();
|
||||
}
|
||||
|
||||
@Test(expected = UnsupportedOperationException.class)
|
||||
public void test_killAll_throwsUnsupportedOperationException()
|
||||
{
|
||||
AzureDataSegmentKiller killer = new AzureDataSegmentKiller(azureStorage);
|
||||
killer.killAll();
|
||||
}
|
||||
|
||||
private void common_test_kill_StorageExceptionExtendedError_throwsException(StorageExtendedErrorInformation storageExtendedErrorInformation)
|
||||
throws SegmentLoadingException, URISyntaxException, StorageException
|
||||
{
|
||||
String dirPath = Paths.get(BLOB_PATH).getParent().toString();
|
||||
|
||||
EasyMock.expect(azureStorage.emptyCloudBlobDirectory(CONTAINER_NAME, dirPath)).andThrow(
|
||||
|
@ -89,7 +138,7 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
|
|||
"",
|
||||
"",
|
||||
400,
|
||||
null,
|
||||
storageExtendedErrorInformation,
|
||||
null
|
||||
)
|
||||
);
|
||||
|
|
|
@ -39,7 +39,8 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport
|
|||
|
||||
private static final String SEGMENT_FILE_NAME = "segment";
|
||||
private static final String CONTAINER_NAME = "container";
|
||||
private static final String BLOB_PATH = "/path/to/storage/index.zip";
|
||||
private static final String BLOB_PATH = "path/to/storage/index.zip";
|
||||
private static final String BLOB_PATH_HADOOP = AzureDataSegmentPuller.AZURE_STORAGE_HOST_ADDRESS + "/path/to/storage/index.zip";
|
||||
private AzureStorage azureStorage;
|
||||
|
||||
@Before
|
||||
|
@ -49,7 +50,8 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testZIPUncompress() throws SegmentLoadingException, URISyntaxException, StorageException, IOException
|
||||
public void test_getSegmentFiles_success()
|
||||
throws SegmentLoadingException, URISyntaxException, StorageException, IOException
|
||||
{
|
||||
final String value = "bucket";
|
||||
final File pulledFile = AzureTestUtils.createZipTempFile(SEGMENT_FILE_NAME, value);
|
||||
|
@ -78,8 +80,39 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_getSegmentFiles_blobPathIsHadoop_success()
|
||||
throws SegmentLoadingException, URISyntaxException, StorageException, IOException
|
||||
{
|
||||
final String value = "bucket";
|
||||
final File pulledFile = AzureTestUtils.createZipTempFile(SEGMENT_FILE_NAME, value);
|
||||
final File toDir = FileUtils.createTempDir();
|
||||
try {
|
||||
final InputStream zipStream = new FileInputStream(pulledFile);
|
||||
|
||||
EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER_NAME, BLOB_PATH)).andReturn(zipStream);
|
||||
|
||||
replayAll();
|
||||
|
||||
AzureDataSegmentPuller puller = new AzureDataSegmentPuller(azureStorage);
|
||||
|
||||
FileUtils.FileCopyResult result = puller.getSegmentFiles(CONTAINER_NAME, BLOB_PATH_HADOOP, toDir);
|
||||
|
||||
File expected = new File(toDir, SEGMENT_FILE_NAME);
|
||||
Assert.assertEquals(value.length(), result.size());
|
||||
Assert.assertTrue(expected.exists());
|
||||
Assert.assertEquals(value.length(), expected.length());
|
||||
|
||||
verifyAll();
|
||||
}
|
||||
finally {
|
||||
pulledFile.delete();
|
||||
FileUtils.deleteDirectory(toDir);
|
||||
}
|
||||
}
|
||||
|
||||
@Test(expected = RuntimeException.class)
|
||||
public void testDeleteOutputDirectoryWhenErrorIsRaisedPullingSegmentFiles()
|
||||
public void test_getSegmentFiles_nonRecoverableErrorRaisedWhenPullingSegmentFiles_doNotDeleteOutputDirectory()
|
||||
throws IOException, URISyntaxException, StorageException, SegmentLoadingException
|
||||
{
|
||||
|
||||
|
@ -98,11 +131,43 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport
|
|||
AzureDataSegmentPuller puller = new AzureDataSegmentPuller(azureStorage);
|
||||
|
||||
puller.getSegmentFiles(CONTAINER_NAME, BLOB_PATH, outDir);
|
||||
}
|
||||
catch (Exception e) {
|
||||
Assert.assertTrue(outDir.exists());
|
||||
verifyAll();
|
||||
throw e;
|
||||
}
|
||||
finally {
|
||||
FileUtils.deleteDirectory(outDir);
|
||||
}
|
||||
}
|
||||
|
||||
@Test(expected = SegmentLoadingException.class)
|
||||
public void test_getSegmentFiles_recoverableErrorRaisedWhenPullingSegmentFiles_deleteOutputDirectory()
|
||||
throws IOException, URISyntaxException, StorageException, SegmentLoadingException
|
||||
{
|
||||
|
||||
final File outDir = FileUtils.createTempDir();
|
||||
try {
|
||||
EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER_NAME, BLOB_PATH)).andThrow(
|
||||
new StorageException(null, null, 0, null, null)
|
||||
).atLeastOnce();
|
||||
|
||||
replayAll();
|
||||
|
||||
AzureDataSegmentPuller puller = new AzureDataSegmentPuller(azureStorage);
|
||||
|
||||
puller.getSegmentFiles(CONTAINER_NAME, BLOB_PATH, outDir);
|
||||
|
||||
Assert.assertFalse(outDir.exists());
|
||||
|
||||
verifyAll();
|
||||
}
|
||||
catch (Exception e) {
|
||||
Assert.assertFalse(outDir.exists());
|
||||
verifyAll();
|
||||
throw e;
|
||||
}
|
||||
finally {
|
||||
FileUtils.deleteDirectory(outDir);
|
||||
}
|
||||
|
|
|
@ -42,6 +42,7 @@ import java.io.IOException;
|
|||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
|
@ -63,6 +64,21 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
|
|||
0,
|
||||
1
|
||||
);
|
||||
private static final byte[] DATA = new byte[]{0x0, 0x0, 0x0, 0x1};
|
||||
private static final String UNIQUE_MATCHER = "foo/20150101T000000\\.000Z_20160101T000000\\.000Z/0/0/[A-Za-z0-9-]{36}/index\\.zip";
|
||||
private static final String NON_UNIQUE_MATCHER = "foo/20150101T000000\\.000Z_20160101T000000\\.000Z/0/0/index\\.zip";
|
||||
|
||||
private static final DataSegment SEGMENT_TO_PUSH = new DataSegment(
|
||||
"foo",
|
||||
Intervals.of("2015/2016"),
|
||||
"0",
|
||||
new HashMap<>(),
|
||||
new ArrayList<>(),
|
||||
new ArrayList<>(),
|
||||
NoneShardSpec.instance(),
|
||||
0,
|
||||
DATA.length
|
||||
);
|
||||
|
||||
private AzureStorage azureStorage;
|
||||
private AzureAccountConfig azureAccountConfig;
|
||||
|
@ -80,48 +96,91 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testPush() throws Exception
|
||||
{
|
||||
testPushInternal(false, "foo/20150101T000000\\.000Z_20160101T000000\\.000Z/0/0/index\\.zip");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPushUseUniquePath() throws Exception
|
||||
{
|
||||
testPushInternal(true, "foo/20150101T000000\\.000Z_20160101T000000\\.000Z/0/0/[A-Za-z0-9-]{36}/index\\.zip");
|
||||
}
|
||||
|
||||
private void testPushInternal(boolean useUniquePath, String matcher) throws Exception
|
||||
public void test_push_nonUniquePath_succeeds() throws Exception
|
||||
{
|
||||
boolean useUniquePath = false;
|
||||
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig);
|
||||
|
||||
// Create a mock segment on disk
|
||||
File tmp = tempFolder.newFile("version.bin");
|
||||
|
||||
final byte[] data = new byte[]{0x0, 0x0, 0x0, 0x1};
|
||||
Files.write(data, tmp);
|
||||
final long size = data.length;
|
||||
Files.write(DATA, tmp);
|
||||
|
||||
DataSegment segmentToPush = new DataSegment(
|
||||
"foo",
|
||||
Intervals.of("2015/2016"),
|
||||
"0",
|
||||
new HashMap<>(),
|
||||
new ArrayList<>(),
|
||||
new ArrayList<>(),
|
||||
NoneShardSpec.instance(),
|
||||
0,
|
||||
size
|
||||
);
|
||||
String azurePath = pusher.getAzurePath(SEGMENT_TO_PUSH, useUniquePath);
|
||||
azureStorage.uploadBlob(EasyMock.anyObject(File.class), EasyMock.eq(CONTAINER_NAME), EasyMock.eq(azurePath));
|
||||
EasyMock.expectLastCall();
|
||||
|
||||
DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, useUniquePath);
|
||||
replayAll();
|
||||
|
||||
DataSegment segment = pusher.push(tempFolder.getRoot(), SEGMENT_TO_PUSH, useUniquePath);
|
||||
|
||||
Assert.assertTrue(
|
||||
segment.getLoadSpec().get("blobPath").toString(),
|
||||
Pattern.compile(matcher).matcher(segment.getLoadSpec().get("blobPath").toString()).matches()
|
||||
Pattern.compile(NON_UNIQUE_MATCHER).matcher(segment.getLoadSpec().get("blobPath").toString()).matches()
|
||||
);
|
||||
|
||||
Assert.assertEquals(segmentToPush.getSize(), segment.getSize());
|
||||
Assert.assertEquals(SEGMENT_TO_PUSH.getSize(), segment.getSize());
|
||||
|
||||
verifyAll();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_push_uniquePath_succeeds() throws Exception
|
||||
{
|
||||
boolean useUniquePath = true;
|
||||
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig);
|
||||
|
||||
// Create a mock segment on disk
|
||||
File tmp = tempFolder.newFile("version.bin");
|
||||
|
||||
Files.write(DATA, tmp);
|
||||
|
||||
String azurePath = pusher.getAzurePath(SEGMENT_TO_PUSH, useUniquePath);
|
||||
azureStorage.uploadBlob(EasyMock.anyObject(File.class), EasyMock.eq(CONTAINER_NAME), EasyMock.matches(UNIQUE_MATCHER));
|
||||
EasyMock.expectLastCall();
|
||||
|
||||
replayAll();
|
||||
|
||||
DataSegment segment = pusher.push(tempFolder.getRoot(), SEGMENT_TO_PUSH, useUniquePath);
|
||||
|
||||
Assert.assertTrue(
|
||||
segment.getLoadSpec().get("blobPath").toString(),
|
||||
Pattern.compile(UNIQUE_MATCHER).matcher(segment.getLoadSpec().get("blobPath").toString()).matches()
|
||||
);
|
||||
|
||||
Assert.assertEquals(SEGMENT_TO_PUSH.getSize(), segment.getSize());
|
||||
|
||||
verifyAll();
|
||||
}
|
||||
|
||||
@Test(expected = RuntimeException.class)
|
||||
public void test_push_exception_throwsException() throws Exception
|
||||
{
|
||||
boolean useUniquePath = true;
|
||||
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig);
|
||||
|
||||
// Create a mock segment on disk
|
||||
File tmp = tempFolder.newFile("version.bin");
|
||||
|
||||
Files.write(DATA, tmp);
|
||||
final long size = DATA.length;
|
||||
|
||||
String azurePath = pusher.getAzurePath(SEGMENT_TO_PUSH, useUniquePath);
|
||||
azureStorage.uploadBlob(EasyMock.anyObject(File.class), EasyMock.eq(CONTAINER_NAME), EasyMock.eq(azurePath));
|
||||
EasyMock.expectLastCall().andThrow(new URISyntaxException("", ""));
|
||||
|
||||
replayAll();
|
||||
|
||||
DataSegment segment = pusher.push(tempFolder.getRoot(), SEGMENT_TO_PUSH, useUniquePath);
|
||||
|
||||
Assert.assertTrue(
|
||||
segment.getLoadSpec().get("blobPath").toString(),
|
||||
Pattern.compile(UNIQUE_MATCHER).matcher(segment.getLoadSpec().get("blobPath").toString()).matches()
|
||||
);
|
||||
|
||||
Assert.assertEquals(SEGMENT_TO_PUSH.getSize(), segment.getSize());
|
||||
|
||||
verifyAll();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -176,6 +235,22 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
|
|||
Assert.assertEquals("wasbs://container@account.blob.core.windows.net/", hadoopPath);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_getPathForHadoop_noArgs_succeeds()
|
||||
{
|
||||
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig);
|
||||
String hadoopPath = pusher.getPathForHadoop("");
|
||||
Assert.assertEquals("wasbs://container@account.blob.core.windows.net/", hadoopPath);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_getAllowedPropertyPrefixesForHadoop_returnsExpcetedPropertyPrefixes()
|
||||
{
|
||||
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig);
|
||||
List<String> actualPropertyPrefixes = pusher.getAllowedPropertyPrefixesForHadoop();
|
||||
Assert.assertEquals(AzureDataSegmentPusher.ALLOWED_PROPERTY_PREFIXES_FOR_HADOOP, actualPropertyPrefixes);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void storageDirContainsNoColonsTest()
|
||||
{
|
||||
|
|
|
@ -0,0 +1,122 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.storage.azure;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Key;
|
||||
import com.google.inject.Module;
|
||||
import com.microsoft.azure.storage.StorageCredentials;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobClient;
|
||||
import org.apache.druid.guice.DruidGuiceExtensions;
|
||||
import org.apache.druid.guice.JsonConfigurator;
|
||||
import org.apache.druid.guice.LazySingleton;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import javax.validation.Validation;
|
||||
import javax.validation.Validator;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Base64;
|
||||
import java.util.Properties;
|
||||
|
||||
public class AzureStorageDruidModuleTest
|
||||
{
|
||||
private static final String AZURE_ACCOUNT_NAME;
|
||||
private static final String AZURE_ACCOUNT_KEY;
|
||||
private static final String AZURE_CONTAINER;
|
||||
private Injector injector;
|
||||
|
||||
static {
|
||||
try {
|
||||
AZURE_ACCOUNT_NAME = "azureAccount1";
|
||||
AZURE_ACCOUNT_KEY = Base64.getUrlEncoder()
|
||||
.encodeToString("azureKey1".getBytes(StandardCharsets.UTF_8.toString()));
|
||||
AZURE_CONTAINER = "azureContainer1";
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_getBlobClient_expectedClient()
|
||||
{
|
||||
final Properties props = new Properties();
|
||||
props.put("druid.azure.account", AZURE_ACCOUNT_NAME);
|
||||
props.put("druid.azure.key", AZURE_ACCOUNT_KEY);
|
||||
props.put("druid.azure.container", AZURE_CONTAINER);
|
||||
injector = makeInjectorWithProperties(props);
|
||||
AzureAccountConfig azureAccountConfig = injector.getInstance(Key.get(AzureAccountConfig.class));
|
||||
|
||||
Assert.assertEquals(AZURE_ACCOUNT_NAME, azureAccountConfig.getAccount());
|
||||
Assert.assertEquals(AZURE_ACCOUNT_KEY, azureAccountConfig.getKey());
|
||||
Assert.assertEquals(AZURE_CONTAINER, azureAccountConfig.getContainer());
|
||||
|
||||
CloudBlobClient cloudBlobClient = injector.getInstance(CloudBlobClient.class);
|
||||
StorageCredentials storageCredentials = cloudBlobClient.getCredentials();
|
||||
|
||||
Assert.assertEquals(AZURE_ACCOUNT_NAME, storageCredentials.getAccountName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_getAzureStorageContainer_expectedClient()
|
||||
{
|
||||
final Properties props = new Properties();
|
||||
props.put("druid.azure.account", AZURE_ACCOUNT_NAME);
|
||||
props.put("druid.azure.key", AZURE_ACCOUNT_KEY);
|
||||
props.put("druid.azure.container", AZURE_CONTAINER);
|
||||
injector = makeInjectorWithProperties(props);
|
||||
AzureAccountConfig azureAccountConfig = injector.getInstance(Key.get(AzureAccountConfig.class));
|
||||
|
||||
Assert.assertEquals(AZURE_ACCOUNT_NAME, azureAccountConfig.getAccount());
|
||||
Assert.assertEquals(AZURE_ACCOUNT_KEY, azureAccountConfig.getKey());
|
||||
Assert.assertEquals(AZURE_CONTAINER, azureAccountConfig.getContainer());
|
||||
|
||||
CloudBlobClient cloudBlobClient = injector.getInstance(CloudBlobClient.class);
|
||||
StorageCredentials storageCredentials = cloudBlobClient.getCredentials();
|
||||
|
||||
Assert.assertEquals(AZURE_ACCOUNT_NAME, storageCredentials.getAccountName());
|
||||
|
||||
AzureStorage azureStorage = injector.getInstance(AzureStorage.class);
|
||||
Assert.assertSame(cloudBlobClient, azureStorage.getCloudBlobClient());
|
||||
}
|
||||
|
||||
private Injector makeInjectorWithProperties(final Properties props)
|
||||
{
|
||||
return Guice.createInjector(
|
||||
ImmutableList.of(
|
||||
new DruidGuiceExtensions(),
|
||||
new Module()
|
||||
{
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator());
|
||||
binder.bind(JsonConfigurator.class).in(LazySingleton.class);
|
||||
binder.bind(Properties.class).toInstance(props);
|
||||
}
|
||||
},
|
||||
new AzureStorageDruidModule()
|
||||
));
|
||||
}
|
||||
}
|
|
@ -26,13 +26,16 @@ import org.apache.druid.java.util.common.FileUtils;
|
|||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.easymock.EasyMock;
|
||||
import org.easymock.EasyMockSupport;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.StringWriter;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
public class AzureTaskLogsTest extends EasyMockSupport
|
||||
|
@ -41,6 +44,7 @@ public class AzureTaskLogsTest extends EasyMockSupport
|
|||
private static final String CONTAINER = "test";
|
||||
private static final String PREFIX = "test/log";
|
||||
private static final String TASK_ID = "taskid";
|
||||
private static final String TASK_ID_NOT_FOUND = "taskidNotFound";
|
||||
private static final AzureTaskLogsConfig AZURE_TASK_LOGS_CONFIG = new AzureTaskLogsConfig(CONTAINER, PREFIX, 3);
|
||||
|
||||
private AzureStorage azureStorage;
|
||||
|
@ -55,7 +59,7 @@ public class AzureTaskLogsTest extends EasyMockSupport
|
|||
|
||||
|
||||
@Test
|
||||
public void testPushTaskLog() throws Exception
|
||||
public void test_PushTaskLog_uploadsBlob() throws Exception
|
||||
{
|
||||
final File tmpDir = FileUtils.createTempDir();
|
||||
|
||||
|
@ -76,6 +80,72 @@ public class AzureTaskLogsTest extends EasyMockSupport
|
|||
}
|
||||
}
|
||||
|
||||
@Test(expected = RuntimeException.class)
|
||||
public void test_PushTaskLog_exception_rethrowsException() throws Exception
|
||||
{
|
||||
final File tmpDir = FileUtils.createTempDir();
|
||||
|
||||
try {
|
||||
final File logFile = new File(tmpDir, "log");
|
||||
|
||||
azureStorage.uploadBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/log");
|
||||
EasyMock.expectLastCall().andThrow(new IOException());
|
||||
|
||||
replayAll();
|
||||
|
||||
azureTaskLogs.pushTaskLog(TASK_ID, logFile);
|
||||
|
||||
verifyAll();
|
||||
}
|
||||
finally {
|
||||
FileUtils.deleteDirectory(tmpDir);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_PushTaskReports_uploadsBlob() throws Exception
|
||||
{
|
||||
final File tmpDir = FileUtils.createTempDir();
|
||||
|
||||
try {
|
||||
final File logFile = new File(tmpDir, "log");
|
||||
|
||||
azureStorage.uploadBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/report.json");
|
||||
EasyMock.expectLastCall();
|
||||
|
||||
replayAll();
|
||||
|
||||
azureTaskLogs.pushTaskReports(TASK_ID, logFile);
|
||||
|
||||
verifyAll();
|
||||
}
|
||||
finally {
|
||||
FileUtils.deleteDirectory(tmpDir);
|
||||
}
|
||||
}
|
||||
|
||||
@Test(expected = RuntimeException.class)
|
||||
public void test_PushTaskReports_exception_rethrowsException() throws Exception
|
||||
{
|
||||
final File tmpDir = FileUtils.createTempDir();
|
||||
|
||||
try {
|
||||
final File logFile = new File(tmpDir, "log");
|
||||
|
||||
azureStorage.uploadBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/report.json");
|
||||
EasyMock.expectLastCall().andThrow(new IOException());
|
||||
|
||||
replayAll();
|
||||
|
||||
azureTaskLogs.pushTaskReports(TASK_ID, logFile);
|
||||
|
||||
verifyAll();
|
||||
}
|
||||
finally {
|
||||
FileUtils.deleteDirectory(tmpDir);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStreamTaskLogWithoutOffset() throws Exception
|
||||
{
|
||||
|
@ -144,4 +214,99 @@ public class AzureTaskLogsTest extends EasyMockSupport
|
|||
|
||||
verifyAll();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_streamTaskReports_blobExists_succeeds() throws Exception
|
||||
{
|
||||
final String testLog = "hello this is a log";
|
||||
|
||||
final String blobPath = PREFIX + "/" + TASK_ID + "/report.json";
|
||||
EasyMock.expect(azureStorage.getBlobExists(CONTAINER, blobPath)).andReturn(true);
|
||||
EasyMock.expect(azureStorage.getBlobLength(CONTAINER, blobPath)).andReturn((long) testLog.length());
|
||||
EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER, blobPath)).andReturn(
|
||||
new ByteArrayInputStream(testLog.getBytes(StandardCharsets.UTF_8)));
|
||||
|
||||
|
||||
replayAll();
|
||||
|
||||
final Optional<ByteSource> byteSource = azureTaskLogs.streamTaskReports(TASK_ID);
|
||||
|
||||
final StringWriter writer = new StringWriter();
|
||||
IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
|
||||
Assert.assertEquals(writer.toString(), testLog);
|
||||
|
||||
verifyAll();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_streamTaskReports_blobDoesNotExist_returnsAbsent() throws Exception
|
||||
{
|
||||
final String testLog = "hello this is a log";
|
||||
|
||||
final String blobPath = PREFIX + "/" + TASK_ID_NOT_FOUND + "/report.json";
|
||||
EasyMock.expect(azureStorage.getBlobExists(CONTAINER, blobPath)).andReturn(false);
|
||||
|
||||
replayAll();
|
||||
|
||||
final Optional<ByteSource> byteSource = azureTaskLogs.streamTaskReports(TASK_ID_NOT_FOUND);
|
||||
|
||||
|
||||
Assert.assertFalse(byteSource.isPresent());
|
||||
|
||||
verifyAll();
|
||||
}
|
||||
|
||||
@Test(expected = IOException.class)
|
||||
public void test_streamTaskReports_exceptionWhenGettingStream_throwsException() throws Exception
|
||||
{
|
||||
final String testLog = "hello this is a log";
|
||||
|
||||
final String blobPath = PREFIX + "/" + TASK_ID + "/report.json";
|
||||
EasyMock.expect(azureStorage.getBlobExists(CONTAINER, blobPath)).andReturn(true);
|
||||
EasyMock.expect(azureStorage.getBlobLength(CONTAINER, blobPath)).andReturn((long) testLog.length());
|
||||
EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER, blobPath)).andThrow(
|
||||
new URISyntaxException("", ""));
|
||||
|
||||
|
||||
replayAll();
|
||||
|
||||
final Optional<ByteSource> byteSource = azureTaskLogs.streamTaskReports(TASK_ID);
|
||||
|
||||
final StringWriter writer = new StringWriter();
|
||||
IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
|
||||
verifyAll();
|
||||
}
|
||||
|
||||
@Test(expected = IOException.class)
|
||||
public void test_streamTaskReports_exceptionWhenCheckingBlobExistence_throwsException() throws Exception
|
||||
{
|
||||
final String testLog = "hello this is a log";
|
||||
|
||||
final String blobPath = PREFIX + "/" + TASK_ID + "/report.json";
|
||||
EasyMock.expect(azureStorage.getBlobExists(CONTAINER, blobPath)).andThrow(new URISyntaxException("", ""));
|
||||
|
||||
replayAll();
|
||||
|
||||
azureTaskLogs.streamTaskReports(TASK_ID);
|
||||
|
||||
verifyAll();
|
||||
}
|
||||
|
||||
@Test (expected = UnsupportedOperationException.class)
|
||||
public void test_killAll_throwsUnsupportedOperationException()
|
||||
{
|
||||
azureTaskLogs.killAll();
|
||||
}
|
||||
|
||||
@Test (expected = UnsupportedOperationException.class)
|
||||
public void test_killOlderThan_throwsUnsupportedOperationException()
|
||||
{
|
||||
azureTaskLogs.killOlderThan(0);
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanup()
|
||||
{
|
||||
resetAll();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue