diff --git a/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureByteSource.java b/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureByteSource.java index 98bb7f15145..2bc4599c27a 100644 --- a/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureByteSource.java +++ b/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureByteSource.java @@ -28,23 +28,26 @@ import java.net.URISyntaxException; public class AzureByteSource extends ByteSource { - final private AzureStorageContainer azureStorageContainer; - final private String filePath; + final private AzureStorage azureStorage; + final private String containerName; + final private String blobPath; public AzureByteSource( - AzureStorageContainer azureStorageContainer, - String filePath + AzureStorage azureStorage, + String containerName, + String blobPath ) { - this.azureStorageContainer = azureStorageContainer; - this.filePath = filePath; + this.azureStorage = azureStorage; + this.containerName = containerName; + this.blobPath = blobPath; } @Override public InputStream openStream() throws IOException { try { - return azureStorageContainer.getBlobInputStream(filePath); + return azureStorage.getBlobInputStream(containerName, blobPath); } catch (StorageException | URISyntaxException e) { if (AzureUtils.AZURE_RETRY.apply(e)) { diff --git a/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentKiller.java b/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentKiller.java index fda9bf1b6ef..5517e0d0332 100644 --- a/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentKiller.java +++ b/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentKiller.java @@ -26,20 +26,21 @@ import io.druid.segment.loading.SegmentLoadingException; import io.druid.timeline.DataSegment; import java.net.URISyntaxException; +import java.nio.file.Paths; import java.util.Map; public class AzureDataSegmentKiller implements DataSegmentKiller { private static final Logger log = new Logger(AzureDataSegmentKiller.class); - private final AzureStorageContainer azureStorageContainer; + private final AzureStorage azureStorage; @Inject public AzureDataSegmentKiller( - final AzureStorageContainer azureStorageContainer + final AzureStorage azureStorage ) { - this.azureStorageContainer = azureStorageContainer; + this.azureStorage = azureStorage; } @Override @@ -48,10 +49,12 @@ public class AzureDataSegmentKiller implements DataSegmentKiller log.info("Killing segment [%s]", segment); Map loadSpec = segment.getLoadSpec(); - String storageDir = MapUtils.getString(loadSpec, "storageDir"); + final String containerName = MapUtils.getString(loadSpec, "containerName"); + final String blobPath = MapUtils.getString(loadSpec, "blobPath"); + final String dirPath = Paths.get(blobPath).getParent().toString(); try { - azureStorageContainer.emptyCloudBlobDirectory(storageDir); + azureStorage.emptyCloudBlobDirectory(containerName, dirPath); } catch (StorageException | URISyntaxException e) { throw new SegmentLoadingException(e, "Couldn't kill segment[%s]", segment.getIdentifier()); diff --git a/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPuller.java b/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPuller.java index af8879ccf99..8f2f058c057 100644 --- a/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPuller.java +++ b/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPuller.java @@ -36,25 +36,28 @@ public class AzureDataSegmentPuller implements DataSegmentPuller { private static final Logger log = new Logger(AzureDataSegmentPuller.class); - private final AzureStorageContainer azureStorageContainer; + private final AzureStorage azureStorage; @Inject public AzureDataSegmentPuller( - AzureStorageContainer azureStorageContainer + AzureStorage azureStorage ) { - this.azureStorageContainer = azureStorageContainer; + this.azureStorage = azureStorage; } - public com.metamx.common.FileUtils.FileCopyResult getSegmentFiles(final String storageDir, final File outDir) + public com.metamx.common.FileUtils.FileCopyResult getSegmentFiles( + final String containerName, + final String blobPath, + final File outDir + ) throws SegmentLoadingException { prepareOutDir(outDir); try { - final String filePath = String.format("%s/%s", storageDir, AzureStorageDruidModule.INDEX_ZIP_FILE_NAME); - final ByteSource byteSource = new AzureByteSource(azureStorageContainer, filePath); + final ByteSource byteSource = new AzureByteSource(azureStorage, containerName, blobPath); final com.metamx.common.FileUtils.FileCopyResult result = CompressionUtils.unzip( byteSource, outDir, @@ -62,7 +65,7 @@ public class AzureDataSegmentPuller implements DataSegmentPuller true ); - log.info("Loaded %d bytes from [%s] to [%s]", result.size(), filePath, outDir.getAbsolutePath()); + log.info("Loaded %d bytes from [%s] to [%s]", result.size(), blobPath, outDir.getAbsolutePath()); return result; } catch (IOException e) { @@ -74,7 +77,7 @@ public class AzureDataSegmentPuller implements DataSegmentPuller ioe, "Failed to remove output directory [%s] for segment pulled from [%s]", outDir.getAbsolutePath(), - storageDir + blobPath ); } throw new SegmentLoadingException(e, e.getMessage()); @@ -87,9 +90,10 @@ public class AzureDataSegmentPuller implements DataSegmentPuller { final Map loadSpec = segment.getLoadSpec(); - final String storageDir = MapUtils.getString(loadSpec, "storageDir"); + final String containerName = MapUtils.getString(loadSpec, "containerName"); + final String blobPath = MapUtils.getString(loadSpec, "blobPath"); - getSegmentFiles(storageDir, outDir); + getSegmentFiles(containerName, blobPath, outDir); } public void prepareOutDir(final File outDir) throws ISE diff --git a/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java b/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java index f61b96222db..a23ced6c938 100644 --- a/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java +++ b/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java @@ -41,19 +41,19 @@ public class AzureDataSegmentPusher implements DataSegmentPusher { private static final Logger log = new Logger(AzureDataSegmentPusher.class); - private final AzureStorageContainer azureStorageContainer; - private final AzureAccountConfig azureAccountConfig; + private final AzureStorage azureStorage; + private final AzureAccountConfig config; private final ObjectMapper jsonMapper; @Inject public AzureDataSegmentPusher( - AzureStorageContainer azureStorageContainer, - AzureAccountConfig azureAccountConfig, + AzureStorage azureStorage, + AzureAccountConfig config, ObjectMapper jsonMapper ) { - this.azureStorageContainer = azureStorageContainer; - this.azureAccountConfig = azureAccountConfig; + this.azureStorage = azureStorage; + this.config = config; this.jsonMapper = jsonMapper; } @@ -88,7 +88,6 @@ public class AzureDataSegmentPusher implements DataSegmentPusher final String storageDir = DataSegmentPusherUtil.getStorageDir(segment); return ImmutableMap.of( - "storage", storageDir, "index", String.format("%s/%s", storageDir, AzureStorageDruidModule.INDEX_ZIP_FILE_NAME), "descriptor", String.format("%s/%s", storageDir, AzureStorageDruidModule.DESCRIPTOR_FILE_NAME) ); @@ -100,14 +99,6 @@ public class AzureDataSegmentPusher implements DataSegmentPusher return RetryUtils.retry(f, AzureUtils.AZURE_RETRY, maxTries); } - public void uploadThenDelete(final File file, final String azurePath) - throws StorageException, IOException, URISyntaxException - { - azureStorageContainer.uploadBlob(file, azurePath); - log.info("Deleting file [%s]", file); - file.delete(); - } - public DataSegment uploadDataSegment( DataSegment segment, final int version, @@ -117,20 +108,30 @@ public class AzureDataSegmentPusher implements DataSegmentPusher ) throws StorageException, IOException, URISyntaxException { - uploadThenDelete(compressedSegmentData, azurePaths.get("index")); - uploadThenDelete(descriptorFile, azurePaths.get("descriptor")); + azureStorage.uploadBlob(compressedSegmentData, config.getContainer(), azurePaths.get("index")); + azureStorage.uploadBlob(descriptorFile, config.getContainer(), azurePaths.get("descriptor")); - return segment + final DataSegment outSegment = segment .withSize(compressedSegmentData.length()) .withLoadSpec( ImmutableMap.of( "type", AzureStorageDruidModule.SCHEME, - "storageDir", - azurePaths.get("storage") + "containerName", + config.getContainer(), + "blobPath", + azurePaths.get("index") ) ) .withBinaryVersion(version); + + log.info("Deleting file [%s]", compressedSegmentData); + compressedSegmentData.delete(); + + log.info("Deleting file [%s]", descriptorFile); + descriptorFile.delete(); + + return outSegment; } @Override @@ -154,7 +155,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher return uploadDataSegment(segment, version, compressedSegmentData, descriptorFile, azurePaths); } }, - azureAccountConfig.getMaxTries() + config.getMaxTries() ); } catch (Exception e) { diff --git a/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureLoadSpec.java b/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureLoadSpec.java index 4fe7e7b2113..3c3be716dd9 100644 --- a/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureLoadSpec.java +++ b/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureLoadSpec.java @@ -30,25 +30,32 @@ import java.io.File; @JsonTypeName(AzureStorageDruidModule.SCHEME) public class AzureLoadSpec implements LoadSpec { + @JsonProperty - private final String storageDir; + private final String containerName; + + @JsonProperty + private final String blobPath; private final AzureDataSegmentPuller puller; @JsonCreator public AzureLoadSpec( - @JacksonInject AzureDataSegmentPuller puller, - @JsonProperty("storageDir") String storageDir + @JsonProperty("containerName") String containerName, + @JsonProperty("blobPath") String blobPath, + @JacksonInject AzureDataSegmentPuller puller ) { - Preconditions.checkNotNull(storageDir); - this.storageDir = storageDir; + Preconditions.checkNotNull(blobPath); + Preconditions.checkNotNull(containerName); + this.containerName = containerName; + this.blobPath = blobPath; this.puller = puller; } @Override public LoadSpecResult loadSegment(File file) throws SegmentLoadingException { - return new LoadSpecResult(puller.getSegmentFiles(storageDir, file).size()); + return new LoadSpecResult(puller.getSegmentFiles(containerName, blobPath, file).size()); } } diff --git a/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureStorageContainer.java b/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureStorage.java similarity index 55% rename from extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureStorageContainer.java rename to extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureStorage.java index 3206d54b8cb..b91af1c2eb1 100644 --- a/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureStorageContainer.java +++ b/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureStorage.java @@ -20,8 +20,8 @@ package io.druid.storage.azure; import com.metamx.common.logger.Logger; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.CloudBlob; +import com.microsoft.azure.storage.blob.CloudBlobClient; import com.microsoft.azure.storage.blob.CloudBlobContainer; -import com.microsoft.azure.storage.blob.CloudBlobDirectory; import com.microsoft.azure.storage.blob.CloudBlockBlob; import com.microsoft.azure.storage.blob.ListBlobItem; @@ -33,27 +33,36 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; -public class AzureStorageContainer +public class AzureStorage { - private final Logger log = new Logger(AzureStorageContainer.class); + private final Logger log = new Logger(AzureStorage.class); - private final CloudBlobContainer cloudBlobContainer; + private final CloudBlobClient cloudBlobClient; - public AzureStorageContainer( - CloudBlobContainer cloudBlobContainer + public AzureStorage( + CloudBlobClient cloudBlobClient ) { - this.cloudBlobContainer = cloudBlobContainer; + this.cloudBlobClient = cloudBlobClient; } - public List emptyCloudBlobDirectory(final String path) + public CloudBlobContainer getCloudBlobContainer(final String containerName) + throws StorageException, URISyntaxException + { + CloudBlobContainer cloudBlobContainer = cloudBlobClient.getContainerReference(containerName); + cloudBlobContainer.createIfNotExists(); + + return cloudBlobContainer; + } + + public List emptyCloudBlobDirectory(final String containerName, final String virtualDirPath) throws StorageException, URISyntaxException { List deletedFiles = new ArrayList<>(); + CloudBlobContainer container = getCloudBlobContainer(containerName); - for (ListBlobItem blobItem : cloudBlobContainer.listBlobs(path, true, null, null, null)) - { + for (ListBlobItem blobItem : container.listBlobs(virtualDirPath, true, null, null, null)) { CloudBlob cloudBlob = (CloudBlob) blobItem; log.info("Removing file[%s] from Azure.", cloudBlob.getName()); if (cloudBlob.deleteIfExists()) { @@ -61,27 +70,30 @@ public class AzureStorageContainer } } - if (deletedFiles.isEmpty()) - { - log.warn("No files were deleted on the following Azure path: [%s]", path); + if (deletedFiles.isEmpty()) { + log.warn("No files were deleted on the following Azure path: [%s]", virtualDirPath); } return deletedFiles; } - public void uploadBlob(final File file, final String destination) + public void uploadBlob(final File file, final String containerName, final String blobPath) throws IOException, StorageException, URISyntaxException { + CloudBlobContainer container = getCloudBlobContainer(containerName); try (FileInputStream stream = new FileInputStream(file)) { - CloudBlockBlob blob = cloudBlobContainer.getBlockBlobReference(destination); + CloudBlockBlob blob = container.getBlockBlobReference(blobPath); blob.upload(stream, file.length()); } } - public InputStream getBlobInputStream(final String filePath) throws URISyntaxException, StorageException + public InputStream getBlobInputStream(final String containerName, final String blobPath) + throws URISyntaxException, StorageException { - return cloudBlobContainer.getBlockBlobReference(filePath).openInputStream(); + CloudBlobContainer container = getCloudBlobContainer(containerName); + return container.getBlockBlobReference(blobPath).openInputStream(); } + } diff --git a/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureStorageDruidModule.java b/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureStorageDruidModule.java index c23f8372adc..8987ea2cd58 100644 --- a/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureStorageDruidModule.java +++ b/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureStorageDruidModule.java @@ -23,9 +23,7 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Provides; import com.microsoft.azure.storage.CloudStorageAccount; -import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.CloudBlobClient; -import com.microsoft.azure.storage.blob.CloudBlobContainer; import io.druid.guice.Binders; import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; @@ -90,10 +88,10 @@ public class AzureStorageDruidModule implements DruidModule @Provides @LazySingleton - public CloudStorageAccount getCloudStorageAccount(final AzureAccountConfig config) + public CloudBlobClient getCloudBlobClient(final AzureAccountConfig config) throws URISyntaxException, InvalidKeyException { - return CloudStorageAccount.parse( + CloudStorageAccount account = CloudStorageAccount.parse( String.format( STORAGE_CONNECTION_STRING, config.getProtocol(), @@ -101,30 +99,16 @@ public class AzureStorageDruidModule implements DruidModule config.getKey() ) ); + + return account.createCloudBlobClient(); } @Provides @LazySingleton - public CloudBlobContainer getCloudBlobContainer( - final AzureAccountConfig config, - final CloudStorageAccount cloudStorageAccount - ) - throws URISyntaxException, StorageException - { - CloudBlobClient blobClient = cloudStorageAccount.createCloudBlobClient(); - CloudBlobContainer cloudBlobContainer = blobClient.getContainerReference(config.getContainer()); - - cloudBlobContainer.createIfNotExists(); - - return cloudBlobContainer; - } - - @Provides - @LazySingleton - public AzureStorageContainer getAzureStorageContainer( - final CloudBlobContainer cloudBlobContainer + public AzureStorage getAzureStorageContainer( + final CloudBlobClient cloudBlobClient ) { - return new AzureStorageContainer(cloudBlobContainer); + return new AzureStorage(cloudBlobClient); } } diff --git a/extensions/azure-extensions/src/test/java/io/druid/storage/azure/AzureByteSourceTest.java b/extensions/azure-extensions/src/test/java/io/druid/storage/azure/AzureByteSourceTest.java index 39e8700f3e9..fda493bb2e2 100644 --- a/extensions/azure-extensions/src/test/java/io/druid/storage/azure/AzureByteSourceTest.java +++ b/extensions/azure-extensions/src/test/java/io/druid/storage/azure/AzureByteSourceTest.java @@ -29,18 +29,20 @@ import static org.easymock.EasyMock.*; public class AzureByteSourceTest extends EasyMockSupport { + @Test public void openStreamTest() throws IOException, URISyntaxException, StorageException { - final String filePath = "/path/to/file"; - AzureStorageContainer azureStorageContainer = createMock(AzureStorageContainer.class); + final String containerName = "container"; + final String blobPath = "/path/to/file"; + AzureStorage azureStorage = createMock(AzureStorage.class); InputStream stream = createMock(InputStream.class); - expect(azureStorageContainer.getBlobInputStream(filePath)).andReturn(stream); + expect(azureStorage.getBlobInputStream(containerName, blobPath)).andReturn(stream); replayAll(); - AzureByteSource byteSource = new AzureByteSource(azureStorageContainer, filePath); + AzureByteSource byteSource = new AzureByteSource(azureStorage, containerName, blobPath); byteSource.openStream(); @@ -50,14 +52,23 @@ public class AzureByteSourceTest extends EasyMockSupport @Test(expected = IOException.class) public void openStreamWithRecoverableErrorTest() throws URISyntaxException, StorageException, IOException { - final String filePath = "/path/to/file"; - AzureStorageContainer azureStorageContainer = createMock(AzureStorageContainer.class); + final String containerName = "container"; + final String blobPath = "/path/to/file"; + AzureStorage azureStorage = createMock(AzureStorage.class); - expect(azureStorageContainer.getBlobInputStream(filePath)).andThrow(new StorageException("", "", 500, null, null)); + expect(azureStorage.getBlobInputStream(containerName, blobPath)).andThrow( + new StorageException( + "", + "", + 500, + null, + null + ) + ); replayAll(); - AzureByteSource byteSource = new AzureByteSource(azureStorageContainer, filePath); + AzureByteSource byteSource = new AzureByteSource(azureStorage, containerName, blobPath); byteSource.openStream(); diff --git a/extensions/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentKillerTest.java b/extensions/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentKillerTest.java index 9dd6864245c..1b1ce988be4 100644 --- a/extensions/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentKillerTest.java +++ b/extensions/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentKillerTest.java @@ -28,6 +28,7 @@ import org.junit.Before; import org.junit.Test; import java.net.URISyntaxException; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; @@ -36,11 +37,14 @@ import static org.easymock.EasyMock.*; public class AzureDataSegmentKillerTest extends EasyMockSupport { + private static final String containerName = "container"; + private static final String blobPath = "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip"; + private static final DataSegment dataSegment = new DataSegment( "test", new Interval("2015-04-12/2015-04-13"), "1", - ImmutableMap.of("storageDir", "/path/to/storage/"), + ImmutableMap.of("containerName", containerName, "blobPath", blobPath), null, null, new NoneShardSpec(), @@ -48,12 +52,12 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport 1 ); - private AzureStorageContainer azureStorageContainer; + private AzureStorage azureStorage; @Before public void before() { - azureStorageContainer = createMock(AzureStorageContainer.class); + azureStorage = createMock(AzureStorage.class); } @Test @@ -61,12 +65,13 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport { List deletedFiles = new ArrayList<>(); + final String dirPath = Paths.get(blobPath).getParent().toString(); - expect(azureStorageContainer.emptyCloudBlobDirectory("/path/to/storage/")).andReturn(deletedFiles); + expect(azureStorage.emptyCloudBlobDirectory(containerName, dirPath)).andReturn(deletedFiles); replayAll(); - AzureDataSegmentKiller killer = new AzureDataSegmentKiller(azureStorageContainer); + AzureDataSegmentKiller killer = new AzureDataSegmentKiller(azureStorage); killer.kill(dataSegment); @@ -77,7 +82,9 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport public void killWithErrorTest() throws SegmentLoadingException, URISyntaxException, StorageException { - expect(azureStorageContainer.emptyCloudBlobDirectory("/path/to/storage/")).andThrow( + String dirPath = Paths.get(blobPath).getParent().toString(); + + expect(azureStorage.emptyCloudBlobDirectory(containerName, dirPath)).andThrow( new StorageException( "", "", @@ -89,7 +96,7 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport replayAll(); - AzureDataSegmentKiller killer = new AzureDataSegmentKiller(azureStorageContainer); + AzureDataSegmentKiller killer = new AzureDataSegmentKiller(azureStorage); killer.kill(dataSegment); diff --git a/extensions/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPullerTest.java b/extensions/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPullerTest.java index 209f3e0d2f9..7a5eee17c62 100644 --- a/extensions/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPullerTest.java +++ b/extensions/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPullerTest.java @@ -19,7 +19,6 @@ package io.druid.storage.azure; import com.google.common.collect.ImmutableMap; import com.metamx.common.FileUtils; -import com.metamx.common.MapUtils; import com.microsoft.azure.storage.StorageException; import io.druid.segment.loading.SegmentLoadingException; import io.druid.timeline.DataSegment; @@ -35,7 +34,6 @@ import java.io.IOException; import java.io.InputStream; import java.net.URISyntaxException; import java.nio.file.Files; -import java.util.Map; import static org.easymock.EasyMock.*; import static org.junit.Assert.assertEquals; @@ -45,13 +43,15 @@ import static org.junit.Assert.assertTrue; public class AzureDataSegmentPullerTest extends EasyMockSupport { - private AzureStorageContainer azureStorageContainer; + private AzureStorage azureStorage; private static final String SEGMENT_FILE_NAME = "segment"; + private static final String containerName = "container"; + private static final String blobPath = "/path/to/storage/index.zip"; private static final DataSegment dataSegment = new DataSegment( "test", new Interval("2015-04-12/2015-04-13"), "1", - ImmutableMap.of("storageDir", "/path/to/storage/"), + ImmutableMap.of("containerName", containerName, "blobPath", blobPath), null, null, new NoneShardSpec(), @@ -62,7 +62,7 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport @Before public void before() { - azureStorageContainer = createMock(AzureStorageContainer.class); + azureStorage = createMock(AzureStorage.class); } @Test @@ -73,16 +73,15 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport pulledFile.deleteOnExit(); final File toDir = Files.createTempDirectory("druid").toFile(); toDir.deleteOnExit(); - final File zipFilePath = new File(pulledFile.getParent(), AzureStorageDruidModule.INDEX_ZIP_FILE_NAME); final InputStream zipStream = new FileInputStream(pulledFile); - expect(azureStorageContainer.getBlobInputStream(zipFilePath.getAbsolutePath())).andReturn(zipStream); + expect(azureStorage.getBlobInputStream(containerName, blobPath)).andReturn(zipStream); replayAll(); - AzureDataSegmentPuller puller = new AzureDataSegmentPuller(azureStorageContainer); + AzureDataSegmentPuller puller = new AzureDataSegmentPuller(azureStorage); - FileUtils.FileCopyResult result = puller.getSegmentFiles(pulledFile.getParent(), toDir); + FileUtils.FileCopyResult result = puller.getSegmentFiles(containerName, blobPath, toDir); File expected = new File(toDir, SEGMENT_FILE_NAME); assertEquals(value.length(), result.size()); @@ -97,14 +96,10 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport throws IOException, URISyntaxException, StorageException, SegmentLoadingException { - final File fromDir = Files.createTempDirectory("druidA").toFile(); - fromDir.deleteOnExit(); final File outDir = Files.createTempDirectory("druid").toFile(); outDir.deleteOnExit(); - final File zipFilePath = new File(fromDir.getAbsolutePath(), AzureStorageDruidModule.INDEX_ZIP_FILE_NAME); - - expect(azureStorageContainer.getBlobInputStream(zipFilePath.getAbsolutePath())).andThrow( + expect(azureStorage.getBlobInputStream(containerName, blobPath)).andThrow( new StorageException( "error", "error", @@ -116,9 +111,9 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport replayAll(); - AzureDataSegmentPuller puller = new AzureDataSegmentPuller(azureStorageContainer); + AzureDataSegmentPuller puller = new AzureDataSegmentPuller(azureStorage); - puller.getSegmentFiles(fromDir.getAbsolutePath(), outDir); + puller.getSegmentFiles(containerName, blobPath, outDir); assertFalse(outDir.exists()); @@ -130,14 +125,12 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport public void getSegmentFilesTest() throws SegmentLoadingException { final File outDir = new File(""); - final Map loadSpec = dataSegment.getLoadSpec(); - final String storageDir = MapUtils.getString(loadSpec, "storageDir"); final FileUtils.FileCopyResult result = createMock(FileUtils.FileCopyResult.class); final AzureDataSegmentPuller puller = createMockBuilder(AzureDataSegmentPuller.class).withConstructor( - azureStorageContainer - ).addMockedMethod("getSegmentFiles", String.class, File.class).createMock(); + azureStorage + ).addMockedMethod("getSegmentFiles", String.class, String.class, File.class).createMock(); - expect(puller.getSegmentFiles(storageDir, outDir)).andReturn(result); + expect(puller.getSegmentFiles(containerName, blobPath, outDir)).andReturn(result); replayAll(); @@ -153,7 +146,7 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport File outDir = Files.createTempDirectory("druid").toFile(); try { - AzureDataSegmentPuller puller = new AzureDataSegmentPuller(azureStorageContainer); + AzureDataSegmentPuller puller = new AzureDataSegmentPuller(azureStorage); puller.prepareOutDir(outDir); assertTrue(outDir.exists()); diff --git a/extensions/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPusherTest.java b/extensions/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPusherTest.java index d154a8feae8..e05c0fc930b 100644 --- a/extensions/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPusherTest.java +++ b/extensions/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPusherTest.java @@ -32,9 +32,7 @@ import org.junit.Test; import java.io.File; import java.io.IOException; import java.net.URISyntaxException; -import java.nio.file.Files; import java.util.Map; -import java.util.concurrent.Callable; import static org.junit.Assert.assertEquals; @@ -42,11 +40,13 @@ import static org.easymock.EasyMock.*; public class AzureDataSegmentPusherTest extends EasyMockSupport { + private static final String containerName = "container"; + private static final String blobPath = "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip"; private static final DataSegment dataSegment = new DataSegment( "test", new Interval("2015-04-12/2015-04-13"), "1", - ImmutableMap.of("storageDir", "/path/to/storage/"), + ImmutableMap.of("containerName", containerName, "blobPath", blobPath), null, null, new NoneShardSpec(), @@ -54,14 +54,14 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport 1 ); - private AzureStorageContainer azureStorageContainer; + private AzureStorage azureStorage; private AzureAccountConfig azureAccountConfig; private ObjectMapper jsonMapper; @Before public void before() { - azureStorageContainer = createMock(AzureStorageContainer.class); + azureStorage = createMock(AzureStorage.class); azureAccountConfig = createMock(AzureAccountConfig.class); jsonMapper = createMock(ObjectMapper.class); @@ -71,11 +71,10 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport public void getAzurePathsTest() { final String storageDir = DataSegmentPusherUtil.getStorageDir(dataSegment); - AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorageContainer, azureAccountConfig, jsonMapper); + AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, jsonMapper); Map paths = pusher.getAzurePaths(dataSegment); - assertEquals(storageDir, paths.get("storage")); assertEquals(String.format("%s/%s", storageDir, AzureStorageDruidModule.INDEX_ZIP_FILE_NAME), paths.get("index")); assertEquals( String.format("%s/%s", storageDir, AzureStorageDruidModule.DESCRIPTOR_FILE_NAME), @@ -83,50 +82,23 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport ); } - @Test - public void uploadThenDeleteTest() throws StorageException, IOException, URISyntaxException - { - final File file = AzureTestUtils.createZipTempFile("segment", "bucket"); - file.deleteOnExit(); - final String azureDestPath = "/azure/path/"; - azureStorageContainer.uploadBlob(file, azureDestPath); - expectLastCall(); - - replayAll(); - - AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorageContainer, azureAccountConfig, jsonMapper); - - pusher.uploadThenDelete(file, azureDestPath); - - verifyAll(); - } - @Test public void uploadDataSegmentTest() throws StorageException, IOException, URISyntaxException { + AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, jsonMapper); final int version = 9; - final File compressedSegmentData = AzureTestUtils.createZipTempFile("segment", "bucket"); - compressedSegmentData.deleteOnExit(); - final File descriptorFile = Files.createTempFile("descriptor", ".json").toFile(); - descriptorFile.deleteOnExit(); - final Map azurePaths = ImmutableMap.of( - "index", - "/path/to/azure/storage", - "descriptor", - "/path/to/azure/storage", - "storage", - "/path/to/azure/storage" - ); + final File compressedSegmentData = new File("index.zip"); + final File descriptorFile = new File("descriptor.json"); + final Map azurePaths = pusher.getAzurePaths(dataSegment); - azureStorageContainer.uploadBlob(compressedSegmentData, azurePaths.get("index")); + expect(azureAccountConfig.getContainer()).andReturn(containerName).times(3); + azureStorage.uploadBlob(compressedSegmentData, containerName, azurePaths.get("index")); expectLastCall(); - azureStorageContainer.uploadBlob(descriptorFile, azurePaths.get("descriptor")); + azureStorage.uploadBlob(descriptorFile, containerName, azurePaths.get("descriptor")); expectLastCall(); replayAll(); - AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorageContainer, azureAccountConfig, jsonMapper); - DataSegment pushedDataSegment = pusher.uploadDataSegment( dataSegment, version, @@ -139,23 +111,10 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport assertEquals(version, (int) pushedDataSegment.getBinaryVersion()); Map loadSpec = pushedDataSegment.getLoadSpec(); assertEquals(AzureStorageDruidModule.SCHEME, MapUtils.getString(loadSpec, "type")); - assertEquals(azurePaths.get("storage"), MapUtils.getString(loadSpec, "storageDir")); + assertEquals(azurePaths.get("index"), MapUtils.getString(loadSpec, "blobPath")); verifyAll(); } - @Test - public void t() throws IOException - { - AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorageContainer, azureAccountConfig, jsonMapper); - - File dir = Files.createTempDirectory("druid").toFile(); - dir.deleteOnExit(); - - File x = pusher.createCompressedSegmentDataFile(dir); - - System.out.println(x); - } - -} \ No newline at end of file +}