diff --git a/extensions-contrib/azure-extensions/pom.xml b/extensions-contrib/azure-extensions/pom.xml
index 2fac7327436..ebf1dc60c1a 100644
--- a/extensions-contrib/azure-extensions/pom.xml
+++ b/extensions-contrib/azure-extensions/pom.xml
@@ -45,7 +45,7 @@
com.microsoft.azure
azure-storage
- 2.1.0
+ 8.6.0
org.slf4j
@@ -59,12 +59,17 @@
org.apache.commons
commons-lang3
+
+ com.google.guava
+ guava
+
com.fasterxml.jackson.module
jackson-module-guice
${jackson.version}
+ provided
commons-io
@@ -84,6 +89,7 @@
com.google.inject
guice
+ provided
com.fasterxml.jackson.core
@@ -103,6 +109,7 @@
com.google.guava
guava
+ provided
javax.validation
@@ -135,4 +142,66 @@
+
+
+
+ org.jacoco
+ jacoco-maven-plugin
+ 0.8.4
+
+
+
+ org/apache/druid/firehose/azure/**/*
+
+
+
+ BUNDLE
+
+
+ INSTRUCTION
+ COVEREDRATIO
+ 0.79
+
+
+ LINE
+ COVEREDRATIO
+ 0.80
+
+
+ BRANCH
+ COVEREDRATIO
+ 0.70
+
+
+ COMPLEXITY
+ COVEREDRATIO
+ 0.73
+
+
+ METHOD
+ COVEREDRATIO
+ 0.76
+
+
+ CLASS
+ COVEREDRATIO
+ 0.83
+
+
+
+
+
+
+
+ report
+ test
+
+ report
+ check
+
+
+
+
+
+
diff --git a/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java b/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java
index 2d2a8bc81b2..e7ddc08c2fe 100644
--- a/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java
+++ b/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java
@@ -19,6 +19,7 @@
package org.apache.druid.storage.azure;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
@@ -40,8 +41,8 @@ import java.util.Map;
public class AzureDataSegmentPusher implements DataSegmentPusher
{
-
private static final Logger log = new Logger(AzureDataSegmentPusher.class);
+ static final List ALLOWED_PROPERTY_PREFIXES_FOR_HADOOP = ImmutableList.of("druid.azure");
private final AzureStorage azureStorage;
private final AzureAccountConfig config;
@@ -103,37 +104,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
@Override
public List getAllowedPropertyPrefixesForHadoop()
{
- return ImmutableList.of("druid.azure");
- }
-
- public String getAzurePath(final DataSegment segment, final boolean useUniquePath)
- {
- final String storageDir = this.getStorageDir(segment, useUniquePath);
-
- return StringUtils.format("%s/%s", storageDir, AzureStorageDruidModule.INDEX_ZIP_FILE_NAME);
-
- }
-
- public DataSegment uploadDataSegment(
- DataSegment segment,
- final int binaryVersion,
- final long size,
- final File compressedSegmentData,
- final String azurePath
- )
- throws StorageException, IOException, URISyntaxException
- {
- azureStorage.uploadBlob(compressedSegmentData, config.getContainer(), azurePath);
-
- final DataSegment outSegment = segment
- .withSize(size)
- .withLoadSpec(this.makeLoadSpec(new URI(azurePath)))
- .withBinaryVersion(binaryVersion);
-
- log.info("Deleting file [%s]", compressedSegmentData);
- compressedSegmentData.delete();
-
- return outSegment;
+ return ALLOWED_PROPERTY_PREFIXES_FOR_HADOOP;
}
@Override
@@ -179,4 +150,36 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
uri.toString()
);
}
+
+ @VisibleForTesting
+ String getAzurePath(final DataSegment segment, final boolean useUniquePath)
+ {
+ final String storageDir = this.getStorageDir(segment, useUniquePath);
+
+ return StringUtils.format("%s/%s", storageDir, AzureStorageDruidModule.INDEX_ZIP_FILE_NAME);
+
+ }
+
+ @VisibleForTesting
+ DataSegment uploadDataSegment(
+ DataSegment segment,
+ final int binaryVersion,
+ final long size,
+ final File compressedSegmentData,
+ final String azurePath
+ )
+ throws StorageException, IOException, URISyntaxException
+ {
+ azureStorage.uploadBlob(compressedSegmentData, config.getContainer(), azurePath);
+
+ final DataSegment outSegment = segment
+ .withSize(size)
+ .withLoadSpec(this.makeLoadSpec(new URI(azurePath)))
+ .withBinaryVersion(binaryVersion);
+
+ log.debug("Deleting file [%s]", compressedSegmentData);
+ compressedSegmentData.delete();
+
+ return outSegment;
+ }
}
diff --git a/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java b/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java
index 79ab48e793c..4a8f017f814 100644
--- a/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java
+++ b/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java
@@ -19,6 +19,7 @@
package org.apache.druid.storage.azure;
+import com.google.common.annotations.VisibleForTesting;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
@@ -48,20 +49,11 @@ public class AzureStorage
this.cloudBlobClient = cloudBlobClient;
}
- public CloudBlobContainer getCloudBlobContainer(final String containerName)
- throws StorageException, URISyntaxException
- {
- CloudBlobContainer cloudBlobContainer = cloudBlobClient.getContainerReference(containerName);
- cloudBlobContainer.createIfNotExists();
-
- return cloudBlobContainer;
- }
-
public List emptyCloudBlobDirectory(final String containerName, final String virtualDirPath)
throws StorageException, URISyntaxException
{
List deletedFiles = new ArrayList<>();
- CloudBlobContainer container = getCloudBlobContainer(containerName);
+ CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName);
for (ListBlobItem blobItem : container.listBlobs(virtualDirPath, true, null, null, null)) {
CloudBlob cloudBlob = (CloudBlob) blobItem;
@@ -81,7 +73,7 @@ public class AzureStorage
public void uploadBlob(final File file, final String containerName, final String blobPath)
throws IOException, StorageException, URISyntaxException
{
- CloudBlobContainer container = getCloudBlobContainer(containerName);
+ CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName);
try (FileInputStream stream = new FileInputStream(file)) {
container.getBlockBlobReference(blobPath).upload(stream, file.length());
}
@@ -90,18 +82,39 @@ public class AzureStorage
public long getBlobLength(final String containerName, final String blobPath)
throws URISyntaxException, StorageException
{
- return getCloudBlobContainer(containerName).getBlockBlobReference(blobPath).getProperties().getLength();
+ return getOrCreateCloudBlobContainer(containerName).getBlockBlobReference(blobPath).getProperties().getLength();
}
public InputStream getBlobInputStream(final String containerName, final String blobPath)
throws URISyntaxException, StorageException
{
- CloudBlobContainer container = getCloudBlobContainer(containerName);
- return container.getBlockBlobReference(blobPath).openInputStream();
+ return getBlobInputStream(0L, containerName, blobPath);
+ }
+
+ public InputStream getBlobInputStream(long offset, final String containerName, final String blobPath)
+ throws URISyntaxException, StorageException
+ {
+ CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName);
+ return container.getBlockBlobReference(blobPath).openInputStream(offset, null, null, null, null);
}
public boolean getBlobExists(String container, String blobPath) throws URISyntaxException, StorageException
{
- return getCloudBlobContainer(container).getBlockBlobReference(blobPath).exists();
+ return getOrCreateCloudBlobContainer(container).getBlockBlobReference(blobPath).exists();
+ }
+
+ @VisibleForTesting
+ CloudBlobClient getCloudBlobClient()
+ {
+ return this.cloudBlobClient;
+ }
+
+ private CloudBlobContainer getOrCreateCloudBlobContainer(final String containerName)
+ throws StorageException, URISyntaxException
+ {
+ CloudBlobContainer cloudBlobContainer = cloudBlobClient.getContainerReference(containerName);
+ cloudBlobContainer.createIfNotExists();
+
+ return cloudBlobContainer;
}
}
diff --git a/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java b/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java
index e5eee086fc3..33761da07eb 100644
--- a/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java
+++ b/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java
@@ -42,7 +42,7 @@ import java.util.List;
public class AzureStorageDruidModule implements DruidModule
{
- public static final String SCHEME = "azure";
+ static final String SCHEME = "azure";
public static final String STORAGE_CONNECTION_STRING = "DefaultEndpointsProtocol=%s;AccountName=%s;AccountKey=%s";
public static final String INDEX_ZIP_FILE_NAME = "index.zip";
diff --git a/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentKillerTest.java b/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentKillerTest.java
index 827d5538396..3ad3842f89f 100644
--- a/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentKillerTest.java
+++ b/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentKillerTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.storage.azure;
import com.google.common.collect.ImmutableMap;
import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.StorageExtendedErrorInformation;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.timeline.DataSegment;
@@ -52,6 +53,9 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
1
);
+ private static final StorageExtendedErrorInformation NULL_STORAGE_EXTENDED_ERROR_INFORMATION = null;
+ private static final StorageExtendedErrorInformation STORAGE_EXTENDED_ERROR_INFORMATION = new StorageExtendedErrorInformation();
+
private AzureStorage azureStorage;
@Before
@@ -79,9 +83,54 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
}
@Test(expected = SegmentLoadingException.class)
- public void killWithErrorTest() throws SegmentLoadingException, URISyntaxException, StorageException
+ public void test_kill_StorageExceptionExtendedErrorInformationNull_throwsException()
+ throws SegmentLoadingException, URISyntaxException, StorageException
{
+ common_test_kill_StorageExceptionExtendedError_throwsException(NULL_STORAGE_EXTENDED_ERROR_INFORMATION);
+ }
+
+ @Test(expected = SegmentLoadingException.class)
+ public void test_kill_StorageExceptionExtendedErrorInformationNotNull_throwsException()
+ throws SegmentLoadingException, URISyntaxException, StorageException
+ {
+
+ common_test_kill_StorageExceptionExtendedError_throwsException(STORAGE_EXTENDED_ERROR_INFORMATION);
+ }
+
+ @Test(expected = SegmentLoadingException.class)
+ public void test_kill_URISyntaxException_throwsException()
+ throws SegmentLoadingException, URISyntaxException, StorageException
+ {
+
+ String dirPath = Paths.get(BLOB_PATH).getParent().toString();
+
+ EasyMock.expect(azureStorage.emptyCloudBlobDirectory(CONTAINER_NAME, dirPath)).andThrow(
+ new URISyntaxException(
+ "",
+ ""
+ )
+ );
+
+ replayAll();
+
+ AzureDataSegmentKiller killer = new AzureDataSegmentKiller(azureStorage);
+
+ killer.kill(DATA_SEGMENT);
+
+ verifyAll();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void test_killAll_throwsUnsupportedOperationException()
+ {
+ AzureDataSegmentKiller killer = new AzureDataSegmentKiller(azureStorage);
+ killer.killAll();
+ }
+
+ private void common_test_kill_StorageExceptionExtendedError_throwsException(StorageExtendedErrorInformation storageExtendedErrorInformation)
+ throws SegmentLoadingException, URISyntaxException, StorageException
+ {
String dirPath = Paths.get(BLOB_PATH).getParent().toString();
EasyMock.expect(azureStorage.emptyCloudBlobDirectory(CONTAINER_NAME, dirPath)).andThrow(
@@ -89,7 +138,7 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
"",
"",
400,
- null,
+ storageExtendedErrorInformation,
null
)
);
diff --git a/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java b/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java
index faf92cbd03b..6851e7e8c2b 100644
--- a/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java
+++ b/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java
@@ -39,7 +39,8 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport
private static final String SEGMENT_FILE_NAME = "segment";
private static final String CONTAINER_NAME = "container";
- private static final String BLOB_PATH = "/path/to/storage/index.zip";
+ private static final String BLOB_PATH = "path/to/storage/index.zip";
+ private static final String BLOB_PATH_HADOOP = AzureDataSegmentPuller.AZURE_STORAGE_HOST_ADDRESS + "/path/to/storage/index.zip";
private AzureStorage azureStorage;
@Before
@@ -49,7 +50,8 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport
}
@Test
- public void testZIPUncompress() throws SegmentLoadingException, URISyntaxException, StorageException, IOException
+ public void test_getSegmentFiles_success()
+ throws SegmentLoadingException, URISyntaxException, StorageException, IOException
{
final String value = "bucket";
final File pulledFile = AzureTestUtils.createZipTempFile(SEGMENT_FILE_NAME, value);
@@ -78,8 +80,39 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport
}
}
+ @Test
+ public void test_getSegmentFiles_blobPathIsHadoop_success()
+ throws SegmentLoadingException, URISyntaxException, StorageException, IOException
+ {
+ final String value = "bucket";
+ final File pulledFile = AzureTestUtils.createZipTempFile(SEGMENT_FILE_NAME, value);
+ final File toDir = FileUtils.createTempDir();
+ try {
+ final InputStream zipStream = new FileInputStream(pulledFile);
+
+ EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER_NAME, BLOB_PATH)).andReturn(zipStream);
+
+ replayAll();
+
+ AzureDataSegmentPuller puller = new AzureDataSegmentPuller(azureStorage);
+
+ FileUtils.FileCopyResult result = puller.getSegmentFiles(CONTAINER_NAME, BLOB_PATH_HADOOP, toDir);
+
+ File expected = new File(toDir, SEGMENT_FILE_NAME);
+ Assert.assertEquals(value.length(), result.size());
+ Assert.assertTrue(expected.exists());
+ Assert.assertEquals(value.length(), expected.length());
+
+ verifyAll();
+ }
+ finally {
+ pulledFile.delete();
+ FileUtils.deleteDirectory(toDir);
+ }
+ }
+
@Test(expected = RuntimeException.class)
- public void testDeleteOutputDirectoryWhenErrorIsRaisedPullingSegmentFiles()
+ public void test_getSegmentFiles_nonRecoverableErrorRaisedWhenPullingSegmentFiles_doNotDeleteOutputDirectory()
throws IOException, URISyntaxException, StorageException, SegmentLoadingException
{
@@ -98,11 +131,43 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport
AzureDataSegmentPuller puller = new AzureDataSegmentPuller(azureStorage);
puller.getSegmentFiles(CONTAINER_NAME, BLOB_PATH, outDir);
+ }
+ catch (Exception e) {
+ Assert.assertTrue(outDir.exists());
+ verifyAll();
+ throw e;
+ }
+ finally {
+ FileUtils.deleteDirectory(outDir);
+ }
+ }
+
+ @Test(expected = SegmentLoadingException.class)
+ public void test_getSegmentFiles_recoverableErrorRaisedWhenPullingSegmentFiles_deleteOutputDirectory()
+ throws IOException, URISyntaxException, StorageException, SegmentLoadingException
+ {
+
+ final File outDir = FileUtils.createTempDir();
+ try {
+ EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER_NAME, BLOB_PATH)).andThrow(
+ new StorageException(null, null, 0, null, null)
+ ).atLeastOnce();
+
+ replayAll();
+
+ AzureDataSegmentPuller puller = new AzureDataSegmentPuller(azureStorage);
+
+ puller.getSegmentFiles(CONTAINER_NAME, BLOB_PATH, outDir);
Assert.assertFalse(outDir.exists());
verifyAll();
}
+ catch (Exception e) {
+ Assert.assertFalse(outDir.exists());
+ verifyAll();
+ throw e;
+ }
finally {
FileUtils.deleteDirectory(outDir);
}
diff --git a/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPusherTest.java b/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPusherTest.java
index 2f1867df7d6..7da2339ab63 100644
--- a/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPusherTest.java
+++ b/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPusherTest.java
@@ -42,6 +42,7 @@ import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
@@ -63,6 +64,21 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
0,
1
);
+ private static final byte[] DATA = new byte[]{0x0, 0x0, 0x0, 0x1};
+ private static final String UNIQUE_MATCHER = "foo/20150101T000000\\.000Z_20160101T000000\\.000Z/0/0/[A-Za-z0-9-]{36}/index\\.zip";
+ private static final String NON_UNIQUE_MATCHER = "foo/20150101T000000\\.000Z_20160101T000000\\.000Z/0/0/index\\.zip";
+
+ private static final DataSegment SEGMENT_TO_PUSH = new DataSegment(
+ "foo",
+ Intervals.of("2015/2016"),
+ "0",
+ new HashMap<>(),
+ new ArrayList<>(),
+ new ArrayList<>(),
+ NoneShardSpec.instance(),
+ 0,
+ DATA.length
+ );
private AzureStorage azureStorage;
private AzureAccountConfig azureAccountConfig;
@@ -80,48 +96,91 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
}
@Test
- public void testPush() throws Exception
- {
- testPushInternal(false, "foo/20150101T000000\\.000Z_20160101T000000\\.000Z/0/0/index\\.zip");
- }
-
- @Test
- public void testPushUseUniquePath() throws Exception
- {
- testPushInternal(true, "foo/20150101T000000\\.000Z_20160101T000000\\.000Z/0/0/[A-Za-z0-9-]{36}/index\\.zip");
- }
-
- private void testPushInternal(boolean useUniquePath, String matcher) throws Exception
+ public void test_push_nonUniquePath_succeeds() throws Exception
{
+ boolean useUniquePath = false;
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig);
// Create a mock segment on disk
File tmp = tempFolder.newFile("version.bin");
- final byte[] data = new byte[]{0x0, 0x0, 0x0, 0x1};
- Files.write(data, tmp);
- final long size = data.length;
+ Files.write(DATA, tmp);
- DataSegment segmentToPush = new DataSegment(
- "foo",
- Intervals.of("2015/2016"),
- "0",
- new HashMap<>(),
- new ArrayList<>(),
- new ArrayList<>(),
- NoneShardSpec.instance(),
- 0,
- size
- );
+ String azurePath = pusher.getAzurePath(SEGMENT_TO_PUSH, useUniquePath);
+ azureStorage.uploadBlob(EasyMock.anyObject(File.class), EasyMock.eq(CONTAINER_NAME), EasyMock.eq(azurePath));
+ EasyMock.expectLastCall();
- DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, useUniquePath);
+ replayAll();
+
+ DataSegment segment = pusher.push(tempFolder.getRoot(), SEGMENT_TO_PUSH, useUniquePath);
Assert.assertTrue(
segment.getLoadSpec().get("blobPath").toString(),
- Pattern.compile(matcher).matcher(segment.getLoadSpec().get("blobPath").toString()).matches()
+ Pattern.compile(NON_UNIQUE_MATCHER).matcher(segment.getLoadSpec().get("blobPath").toString()).matches()
);
- Assert.assertEquals(segmentToPush.getSize(), segment.getSize());
+ Assert.assertEquals(SEGMENT_TO_PUSH.getSize(), segment.getSize());
+
+ verifyAll();
+ }
+
+ @Test
+ public void test_push_uniquePath_succeeds() throws Exception
+ {
+ boolean useUniquePath = true;
+ AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig);
+
+ // Create a mock segment on disk
+ File tmp = tempFolder.newFile("version.bin");
+
+ Files.write(DATA, tmp);
+
+ String azurePath = pusher.getAzurePath(SEGMENT_TO_PUSH, useUniquePath);
+ azureStorage.uploadBlob(EasyMock.anyObject(File.class), EasyMock.eq(CONTAINER_NAME), EasyMock.matches(UNIQUE_MATCHER));
+ EasyMock.expectLastCall();
+
+ replayAll();
+
+ DataSegment segment = pusher.push(tempFolder.getRoot(), SEGMENT_TO_PUSH, useUniquePath);
+
+ Assert.assertTrue(
+ segment.getLoadSpec().get("blobPath").toString(),
+ Pattern.compile(UNIQUE_MATCHER).matcher(segment.getLoadSpec().get("blobPath").toString()).matches()
+ );
+
+ Assert.assertEquals(SEGMENT_TO_PUSH.getSize(), segment.getSize());
+
+ verifyAll();
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void test_push_exception_throwsException() throws Exception
+ {
+ boolean useUniquePath = true;
+ AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig);
+
+ // Create a mock segment on disk
+ File tmp = tempFolder.newFile("version.bin");
+
+ Files.write(DATA, tmp);
+ final long size = DATA.length;
+
+ String azurePath = pusher.getAzurePath(SEGMENT_TO_PUSH, useUniquePath);
+ azureStorage.uploadBlob(EasyMock.anyObject(File.class), EasyMock.eq(CONTAINER_NAME), EasyMock.eq(azurePath));
+ EasyMock.expectLastCall().andThrow(new URISyntaxException("", ""));
+
+ replayAll();
+
+ DataSegment segment = pusher.push(tempFolder.getRoot(), SEGMENT_TO_PUSH, useUniquePath);
+
+ Assert.assertTrue(
+ segment.getLoadSpec().get("blobPath").toString(),
+ Pattern.compile(UNIQUE_MATCHER).matcher(segment.getLoadSpec().get("blobPath").toString()).matches()
+ );
+
+ Assert.assertEquals(SEGMENT_TO_PUSH.getSize(), segment.getSize());
+
+ verifyAll();
}
@Test
@@ -176,6 +235,22 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
Assert.assertEquals("wasbs://container@account.blob.core.windows.net/", hadoopPath);
}
+ @Test
+ public void test_getPathForHadoop_noArgs_succeeds()
+ {
+ AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig);
+ String hadoopPath = pusher.getPathForHadoop("");
+ Assert.assertEquals("wasbs://container@account.blob.core.windows.net/", hadoopPath);
+ }
+
+ @Test
+ public void test_getAllowedPropertyPrefixesForHadoop_returnsExpcetedPropertyPrefixes()
+ {
+ AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig);
+ List actualPropertyPrefixes = pusher.getAllowedPropertyPrefixesForHadoop();
+ Assert.assertEquals(AzureDataSegmentPusher.ALLOWED_PROPERTY_PREFIXES_FOR_HADOOP, actualPropertyPrefixes);
+ }
+
@Test
public void storageDirContainsNoColonsTest()
{
diff --git a/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageDruidModuleTest.java b/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageDruidModuleTest.java
new file mode 100644
index 00000000000..311dcae6c5a
--- /dev/null
+++ b/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageDruidModuleTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.azure;
+
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Binder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.Module;
+import com.microsoft.azure.storage.StorageCredentials;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import org.apache.druid.guice.DruidGuiceExtensions;
+import org.apache.druid.guice.JsonConfigurator;
+import org.apache.druid.guice.LazySingleton;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.validation.Validation;
+import javax.validation.Validator;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Properties;
+
+public class AzureStorageDruidModuleTest
+{
+ private static final String AZURE_ACCOUNT_NAME;
+ private static final String AZURE_ACCOUNT_KEY;
+ private static final String AZURE_CONTAINER;
+ private Injector injector;
+
+ static {
+ try {
+ AZURE_ACCOUNT_NAME = "azureAccount1";
+ AZURE_ACCOUNT_KEY = Base64.getUrlEncoder()
+ .encodeToString("azureKey1".getBytes(StandardCharsets.UTF_8.toString()));
+ AZURE_CONTAINER = "azureContainer1";
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void test_getBlobClient_expectedClient()
+ {
+ final Properties props = new Properties();
+ props.put("druid.azure.account", AZURE_ACCOUNT_NAME);
+ props.put("druid.azure.key", AZURE_ACCOUNT_KEY);
+ props.put("druid.azure.container", AZURE_CONTAINER);
+ injector = makeInjectorWithProperties(props);
+ AzureAccountConfig azureAccountConfig = injector.getInstance(Key.get(AzureAccountConfig.class));
+
+ Assert.assertEquals(AZURE_ACCOUNT_NAME, azureAccountConfig.getAccount());
+ Assert.assertEquals(AZURE_ACCOUNT_KEY, azureAccountConfig.getKey());
+ Assert.assertEquals(AZURE_CONTAINER, azureAccountConfig.getContainer());
+
+ CloudBlobClient cloudBlobClient = injector.getInstance(CloudBlobClient.class);
+ StorageCredentials storageCredentials = cloudBlobClient.getCredentials();
+
+ Assert.assertEquals(AZURE_ACCOUNT_NAME, storageCredentials.getAccountName());
+ }
+
+ @Test
+ public void test_getAzureStorageContainer_expectedClient()
+ {
+ final Properties props = new Properties();
+ props.put("druid.azure.account", AZURE_ACCOUNT_NAME);
+ props.put("druid.azure.key", AZURE_ACCOUNT_KEY);
+ props.put("druid.azure.container", AZURE_CONTAINER);
+ injector = makeInjectorWithProperties(props);
+ AzureAccountConfig azureAccountConfig = injector.getInstance(Key.get(AzureAccountConfig.class));
+
+ Assert.assertEquals(AZURE_ACCOUNT_NAME, azureAccountConfig.getAccount());
+ Assert.assertEquals(AZURE_ACCOUNT_KEY, azureAccountConfig.getKey());
+ Assert.assertEquals(AZURE_CONTAINER, azureAccountConfig.getContainer());
+
+ CloudBlobClient cloudBlobClient = injector.getInstance(CloudBlobClient.class);
+ StorageCredentials storageCredentials = cloudBlobClient.getCredentials();
+
+ Assert.assertEquals(AZURE_ACCOUNT_NAME, storageCredentials.getAccountName());
+
+ AzureStorage azureStorage = injector.getInstance(AzureStorage.class);
+ Assert.assertSame(cloudBlobClient, azureStorage.getCloudBlobClient());
+ }
+
+ private Injector makeInjectorWithProperties(final Properties props)
+ {
+ return Guice.createInjector(
+ ImmutableList.of(
+ new DruidGuiceExtensions(),
+ new Module()
+ {
+ @Override
+ public void configure(Binder binder)
+ {
+ binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator());
+ binder.bind(JsonConfigurator.class).in(LazySingleton.class);
+ binder.bind(Properties.class).toInstance(props);
+ }
+ },
+ new AzureStorageDruidModule()
+ ));
+ }
+}
diff --git a/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java b/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
index 2d6f6dbb620..a3733afdd93 100644
--- a/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
+++ b/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
@@ -26,13 +26,16 @@ import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.File;
+import java.io.IOException;
import java.io.StringWriter;
+import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
public class AzureTaskLogsTest extends EasyMockSupport
@@ -41,6 +44,7 @@ public class AzureTaskLogsTest extends EasyMockSupport
private static final String CONTAINER = "test";
private static final String PREFIX = "test/log";
private static final String TASK_ID = "taskid";
+ private static final String TASK_ID_NOT_FOUND = "taskidNotFound";
private static final AzureTaskLogsConfig AZURE_TASK_LOGS_CONFIG = new AzureTaskLogsConfig(CONTAINER, PREFIX, 3);
private AzureStorage azureStorage;
@@ -55,7 +59,7 @@ public class AzureTaskLogsTest extends EasyMockSupport
@Test
- public void testPushTaskLog() throws Exception
+ public void test_PushTaskLog_uploadsBlob() throws Exception
{
final File tmpDir = FileUtils.createTempDir();
@@ -76,6 +80,72 @@ public class AzureTaskLogsTest extends EasyMockSupport
}
}
+ @Test(expected = RuntimeException.class)
+ public void test_PushTaskLog_exception_rethrowsException() throws Exception
+ {
+ final File tmpDir = FileUtils.createTempDir();
+
+ try {
+ final File logFile = new File(tmpDir, "log");
+
+ azureStorage.uploadBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/log");
+ EasyMock.expectLastCall().andThrow(new IOException());
+
+ replayAll();
+
+ azureTaskLogs.pushTaskLog(TASK_ID, logFile);
+
+ verifyAll();
+ }
+ finally {
+ FileUtils.deleteDirectory(tmpDir);
+ }
+ }
+
+ @Test
+ public void test_PushTaskReports_uploadsBlob() throws Exception
+ {
+ final File tmpDir = FileUtils.createTempDir();
+
+ try {
+ final File logFile = new File(tmpDir, "log");
+
+ azureStorage.uploadBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/report.json");
+ EasyMock.expectLastCall();
+
+ replayAll();
+
+ azureTaskLogs.pushTaskReports(TASK_ID, logFile);
+
+ verifyAll();
+ }
+ finally {
+ FileUtils.deleteDirectory(tmpDir);
+ }
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void test_PushTaskReports_exception_rethrowsException() throws Exception
+ {
+ final File tmpDir = FileUtils.createTempDir();
+
+ try {
+ final File logFile = new File(tmpDir, "log");
+
+ azureStorage.uploadBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/report.json");
+ EasyMock.expectLastCall().andThrow(new IOException());
+
+ replayAll();
+
+ azureTaskLogs.pushTaskReports(TASK_ID, logFile);
+
+ verifyAll();
+ }
+ finally {
+ FileUtils.deleteDirectory(tmpDir);
+ }
+ }
+
@Test
public void testStreamTaskLogWithoutOffset() throws Exception
{
@@ -144,4 +214,99 @@ public class AzureTaskLogsTest extends EasyMockSupport
verifyAll();
}
+
+ @Test
+ public void test_streamTaskReports_blobExists_succeeds() throws Exception
+ {
+ final String testLog = "hello this is a log";
+
+ final String blobPath = PREFIX + "/" + TASK_ID + "/report.json";
+ EasyMock.expect(azureStorage.getBlobExists(CONTAINER, blobPath)).andReturn(true);
+ EasyMock.expect(azureStorage.getBlobLength(CONTAINER, blobPath)).andReturn((long) testLog.length());
+ EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER, blobPath)).andReturn(
+ new ByteArrayInputStream(testLog.getBytes(StandardCharsets.UTF_8)));
+
+
+ replayAll();
+
+ final Optional byteSource = azureTaskLogs.streamTaskReports(TASK_ID);
+
+ final StringWriter writer = new StringWriter();
+ IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
+ Assert.assertEquals(writer.toString(), testLog);
+
+ verifyAll();
+ }
+
+ @Test
+ public void test_streamTaskReports_blobDoesNotExist_returnsAbsent() throws Exception
+ {
+ final String testLog = "hello this is a log";
+
+ final String blobPath = PREFIX + "/" + TASK_ID_NOT_FOUND + "/report.json";
+ EasyMock.expect(azureStorage.getBlobExists(CONTAINER, blobPath)).andReturn(false);
+
+ replayAll();
+
+ final Optional byteSource = azureTaskLogs.streamTaskReports(TASK_ID_NOT_FOUND);
+
+
+ Assert.assertFalse(byteSource.isPresent());
+
+ verifyAll();
+ }
+
+ @Test(expected = IOException.class)
+ public void test_streamTaskReports_exceptionWhenGettingStream_throwsException() throws Exception
+ {
+ final String testLog = "hello this is a log";
+
+ final String blobPath = PREFIX + "/" + TASK_ID + "/report.json";
+ EasyMock.expect(azureStorage.getBlobExists(CONTAINER, blobPath)).andReturn(true);
+ EasyMock.expect(azureStorage.getBlobLength(CONTAINER, blobPath)).andReturn((long) testLog.length());
+ EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER, blobPath)).andThrow(
+ new URISyntaxException("", ""));
+
+
+ replayAll();
+
+ final Optional byteSource = azureTaskLogs.streamTaskReports(TASK_ID);
+
+ final StringWriter writer = new StringWriter();
+ IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
+ verifyAll();
+ }
+
+ @Test(expected = IOException.class)
+ public void test_streamTaskReports_exceptionWhenCheckingBlobExistence_throwsException() throws Exception
+ {
+ final String testLog = "hello this is a log";
+
+ final String blobPath = PREFIX + "/" + TASK_ID + "/report.json";
+ EasyMock.expect(azureStorage.getBlobExists(CONTAINER, blobPath)).andThrow(new URISyntaxException("", ""));
+
+ replayAll();
+
+ azureTaskLogs.streamTaskReports(TASK_ID);
+
+ verifyAll();
+ }
+
+ @Test (expected = UnsupportedOperationException.class)
+ public void test_killAll_throwsUnsupportedOperationException()
+ {
+ azureTaskLogs.killAll();
+ }
+
+ @Test (expected = UnsupportedOperationException.class)
+ public void test_killOlderThan_throwsUnsupportedOperationException()
+ {
+ azureTaskLogs.killOlderThan(0);
+ }
+
+ @After
+ public void cleanup()
+ {
+ resetAll();
+ }
}