diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPuller.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPuller.java index d9b5320dbdc..ebdb79bbebc 100644 --- a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPuller.java +++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPuller.java @@ -38,6 +38,13 @@ public class AzureDataSegmentPuller implements DataSegmentPuller { private static final Logger log = new Logger(AzureDataSegmentPuller.class); + // The azure storage hadoop access pattern is: + // wasb[s]://@.blob.core.windows.net/ + // (from https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-hadoop-use-blob-storage) + static final String AZURE_STORAGE_HADOOP_PROTOCOL = "wasbs"; + + static final String AZURE_STORAGE_HOST_ADDRESS = "blob.core.windows.net"; + private final AzureStorage azureStorage; @Inject @@ -58,7 +65,21 @@ public class AzureDataSegmentPuller implements DataSegmentPuller try { prepareOutDir(outDir); - final ByteSource byteSource = new AzureByteSource(azureStorage, containerName, blobPath); + log.info( + "Loading container: [%s], with blobPath: [%s] and outDir: [%s]", containerName, blobPath, outDir + ); + + boolean blobPathIsHadoop = blobPath.contains(AZURE_STORAGE_HOST_ADDRESS); + final String actualBlobPath; + if (blobPathIsHadoop) { + // Remove azure's hadoop prefix to match realtime ingestion path + actualBlobPath = blobPath.substring( + blobPath.indexOf(AZURE_STORAGE_HOST_ADDRESS) + AZURE_STORAGE_HOST_ADDRESS.length() + 1); + } else { + actualBlobPath = blobPath; + } + + final ByteSource byteSource = new AzureByteSource(azureStorage, containerName, actualBlobPath); final io.druid.java.util.common.FileUtils.FileCopyResult result = CompressionUtils.unzip( byteSource, outDir, @@ -66,7 +87,7 @@ public class AzureDataSegmentPuller implements DataSegmentPuller false ); - log.info("Loaded %d bytes from [%s] to [%s]", result.size(), blobPath, outDir.getAbsolutePath()); + log.info("Loaded %d bytes from [%s] to [%s]", result.size(), actualBlobPath, outDir.getAbsolutePath()); return result; } catch (IOException e) { diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java index dc14667607d..2cb296743ef 100644 --- a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java +++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java @@ -31,6 +31,7 @@ import io.druid.java.util.common.logger.Logger; import io.druid.segment.SegmentUtils; import io.druid.segment.loading.DataSegmentPusher; import io.druid.timeline.DataSegment; +import org.joda.time.format.ISODateTimeFormat; import java.io.File; import java.io.FileOutputStream; @@ -70,7 +71,38 @@ public class AzureDataSegmentPusher implements DataSegmentPusher @Override public String getPathForHadoop() { - return null; + String hadoopPath = StringUtils.format( + "%s://%s@%s.%s/", + AzureDataSegmentPuller.AZURE_STORAGE_HADOOP_PROTOCOL, + config.getContainer(), + config.getAccount(), + AzureDataSegmentPuller.AZURE_STORAGE_HOST_ADDRESS + ); + + log.info("Using Azure blob storage Hadoop path: %s", hadoopPath); + + return hadoopPath; + } + + @Override + public String getStorageDir(DataSegment dataSegment) + { + String seg = JOINER.join( + dataSegment.getDataSource(), + StringUtils.format( + "%s_%s", + // Use ISODateTimeFormat.basicDateTime() format, to avoid using colons in file path. + dataSegment.getInterval().getStart().toString(ISODateTimeFormat.basicDateTime()), + dataSegment.getInterval().getEnd().toString(ISODateTimeFormat.basicDateTime()) + ), + dataSegment.getVersion().replace(":", "_"), + dataSegment.getShardSpec().getPartitionNum() + ); + + log.info("DataSegment: [%s]", seg); + + // Replace colons with underscores, since they are not supported through wasb:// prefix + return seg; } @Override @@ -103,7 +135,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher public DataSegment uploadDataSegment( DataSegment segment, - final int version, + final int binaryVersion, final long size, final File compressedSegmentData, final File descriptorFile, @@ -118,7 +150,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher final DataSegment outSegment = segment .withSize(size) .withLoadSpec(this.makeLoadSpec(new URI(azurePaths.get("index")))) - .withBinaryVersion(version); + .withBinaryVersion(binaryVersion); log.info("Deleting file [%s]", compressedSegmentData); compressedSegmentData.delete(); @@ -135,7 +167,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher { log.info("Uploading [%s] to Azure.", indexFilesDir); - final int version = SegmentUtils.getVersionFromDir(indexFilesDir); + final int binaryVersion = SegmentUtils.getVersionFromDir(indexFilesDir); File zipOutFile = null; File descriptorFile = null; @@ -147,7 +179,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher final Map azurePaths = getAzurePaths(segment); return AzureUtils.retryAzureOperation( - () -> uploadDataSegment(segment, version, size, outFile, descFile, azurePaths, replaceExisting), + () -> uploadDataSegment(segment, binaryVersion, size, outFile, descFile, azurePaths, replaceExisting), config.getMaxTries() ); } diff --git a/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPusherTest.java b/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPusherTest.java index 30c571a4f16..7d2a7917bb6 100644 --- a/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPusherTest.java +++ b/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPusherTest.java @@ -128,7 +128,7 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport public void uploadDataSegmentTest() throws StorageException, IOException, URISyntaxException { AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, jsonMapper); - final int version = 9; + final int binaryVersion = 9; final File compressedSegmentData = new File("index.zip"); final File descriptorFile = new File("descriptor.json"); final Map azurePaths = pusher.getAzurePaths(dataSegment); @@ -142,7 +142,7 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport DataSegment pushedDataSegment = pusher.uploadDataSegment( dataSegment, - version, + binaryVersion, 0, // empty file compressedSegmentData, descriptorFile, @@ -151,11 +151,28 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport ); assertEquals(compressedSegmentData.length(), pushedDataSegment.getSize()); - assertEquals(version, (int) pushedDataSegment.getBinaryVersion()); + assertEquals(binaryVersion, (int) pushedDataSegment.getBinaryVersion()); Map loadSpec = pushedDataSegment.getLoadSpec(); assertEquals(AzureStorageDruidModule.SCHEME, MapUtils.getString(loadSpec, "type")); assertEquals(azurePaths.get("index"), MapUtils.getString(loadSpec, "blobPath")); verifyAll(); } + + @Test + public void getPathForHadoopTest() + { + AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, jsonMapper); + String hadoopPath = pusher.getPathForHadoop(); + Assert.assertEquals("wasbs://container@account.blob.core.windows.net/", hadoopPath); + } + + @Test + public void storageDirContainsNoColonsTest() + { + AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, jsonMapper); + DataSegment withColons = dataSegment.withVersion("2018-01-05T14:54:09.295Z"); + String segmentPath = pusher.getStorageDir(withColons); + Assert.assertFalse("Path should not contain any columns", segmentPath.contains(":")); + } }