Support Hadoop batch ingestion for druid-azure-extensions (#5221)

* Support Hadoop batch ingestion for druid-azure-extensions #5181

* Fix indentation issues

* Fix forbidden-apis violation

* Code & doc improvements for azure-extensions

* Rename version to binaryVersion where appropriate to avoid confusion
* Set default protocol to wasbs://, as recommended by the Azure docs
* Add link to Azure documentation for wasb(s):// path
* Remove any colons from the dataSegment.getVersion()
* Added test for dataSegment.getVersion colon is replaced
* Use StringUtils.format for String concatenation
* remove empty lines

* Remove unneeded StringUtils.format from log.info
This commit is contained in:
Spyros Kapnissis 2018-02-15 15:45:18 +00:00 committed by Roman Leventov
parent cd929000ca
commit be38b18a85
3 changed files with 80 additions and 10 deletions

View File

@ -38,6 +38,13 @@ public class AzureDataSegmentPuller implements DataSegmentPuller
{ {
private static final Logger log = new Logger(AzureDataSegmentPuller.class); private static final Logger log = new Logger(AzureDataSegmentPuller.class);
// The azure storage hadoop access pattern is:
// wasb[s]://<containername>@<accountname>.blob.core.windows.net/<path>
// (from https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-hadoop-use-blob-storage)
static final String AZURE_STORAGE_HADOOP_PROTOCOL = "wasbs";
static final String AZURE_STORAGE_HOST_ADDRESS = "blob.core.windows.net";
private final AzureStorage azureStorage; private final AzureStorage azureStorage;
@Inject @Inject
@ -58,7 +65,21 @@ public class AzureDataSegmentPuller implements DataSegmentPuller
try { try {
prepareOutDir(outDir); prepareOutDir(outDir);
final ByteSource byteSource = new AzureByteSource(azureStorage, containerName, blobPath); log.info(
"Loading container: [%s], with blobPath: [%s] and outDir: [%s]", containerName, blobPath, outDir
);
boolean blobPathIsHadoop = blobPath.contains(AZURE_STORAGE_HOST_ADDRESS);
final String actualBlobPath;
if (blobPathIsHadoop) {
// Remove azure's hadoop prefix to match realtime ingestion path
actualBlobPath = blobPath.substring(
blobPath.indexOf(AZURE_STORAGE_HOST_ADDRESS) + AZURE_STORAGE_HOST_ADDRESS.length() + 1);
} else {
actualBlobPath = blobPath;
}
final ByteSource byteSource = new AzureByteSource(azureStorage, containerName, actualBlobPath);
final io.druid.java.util.common.FileUtils.FileCopyResult result = CompressionUtils.unzip( final io.druid.java.util.common.FileUtils.FileCopyResult result = CompressionUtils.unzip(
byteSource, byteSource,
outDir, outDir,
@ -66,7 +87,7 @@ public class AzureDataSegmentPuller implements DataSegmentPuller
false false
); );
log.info("Loaded %d bytes from [%s] to [%s]", result.size(), blobPath, outDir.getAbsolutePath()); log.info("Loaded %d bytes from [%s] to [%s]", result.size(), actualBlobPath, outDir.getAbsolutePath());
return result; return result;
} }
catch (IOException e) { catch (IOException e) {

View File

@ -31,6 +31,7 @@ import io.druid.java.util.common.logger.Logger;
import io.druid.segment.SegmentUtils; import io.druid.segment.SegmentUtils;
import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.DataSegmentPusher;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.joda.time.format.ISODateTimeFormat;
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
@ -70,7 +71,38 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
@Override @Override
public String getPathForHadoop() public String getPathForHadoop()
{ {
return null; String hadoopPath = StringUtils.format(
"%s://%s@%s.%s/",
AzureDataSegmentPuller.AZURE_STORAGE_HADOOP_PROTOCOL,
config.getContainer(),
config.getAccount(),
AzureDataSegmentPuller.AZURE_STORAGE_HOST_ADDRESS
);
log.info("Using Azure blob storage Hadoop path: %s", hadoopPath);
return hadoopPath;
}
@Override
public String getStorageDir(DataSegment dataSegment)
{
String seg = JOINER.join(
dataSegment.getDataSource(),
StringUtils.format(
"%s_%s",
// Use ISODateTimeFormat.basicDateTime() format, to avoid using colons in file path.
dataSegment.getInterval().getStart().toString(ISODateTimeFormat.basicDateTime()),
dataSegment.getInterval().getEnd().toString(ISODateTimeFormat.basicDateTime())
),
dataSegment.getVersion().replace(":", "_"),
dataSegment.getShardSpec().getPartitionNum()
);
log.info("DataSegment: [%s]", seg);
// Replace colons with underscores, since they are not supported through wasb:// prefix
return seg;
} }
@Override @Override
@ -103,7 +135,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
public DataSegment uploadDataSegment( public DataSegment uploadDataSegment(
DataSegment segment, DataSegment segment,
final int version, final int binaryVersion,
final long size, final long size,
final File compressedSegmentData, final File compressedSegmentData,
final File descriptorFile, final File descriptorFile,
@ -118,7 +150,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
final DataSegment outSegment = segment final DataSegment outSegment = segment
.withSize(size) .withSize(size)
.withLoadSpec(this.makeLoadSpec(new URI(azurePaths.get("index")))) .withLoadSpec(this.makeLoadSpec(new URI(azurePaths.get("index"))))
.withBinaryVersion(version); .withBinaryVersion(binaryVersion);
log.info("Deleting file [%s]", compressedSegmentData); log.info("Deleting file [%s]", compressedSegmentData);
compressedSegmentData.delete(); compressedSegmentData.delete();
@ -135,7 +167,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
{ {
log.info("Uploading [%s] to Azure.", indexFilesDir); log.info("Uploading [%s] to Azure.", indexFilesDir);
final int version = SegmentUtils.getVersionFromDir(indexFilesDir); final int binaryVersion = SegmentUtils.getVersionFromDir(indexFilesDir);
File zipOutFile = null; File zipOutFile = null;
File descriptorFile = null; File descriptorFile = null;
@ -147,7 +179,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
final Map<String, String> azurePaths = getAzurePaths(segment); final Map<String, String> azurePaths = getAzurePaths(segment);
return AzureUtils.retryAzureOperation( return AzureUtils.retryAzureOperation(
() -> uploadDataSegment(segment, version, size, outFile, descFile, azurePaths, replaceExisting), () -> uploadDataSegment(segment, binaryVersion, size, outFile, descFile, azurePaths, replaceExisting),
config.getMaxTries() config.getMaxTries()
); );
} }

View File

@ -128,7 +128,7 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
public void uploadDataSegmentTest() throws StorageException, IOException, URISyntaxException public void uploadDataSegmentTest() throws StorageException, IOException, URISyntaxException
{ {
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, jsonMapper); AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, jsonMapper);
final int version = 9; final int binaryVersion = 9;
final File compressedSegmentData = new File("index.zip"); final File compressedSegmentData = new File("index.zip");
final File descriptorFile = new File("descriptor.json"); final File descriptorFile = new File("descriptor.json");
final Map<String, String> azurePaths = pusher.getAzurePaths(dataSegment); final Map<String, String> azurePaths = pusher.getAzurePaths(dataSegment);
@ -142,7 +142,7 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
DataSegment pushedDataSegment = pusher.uploadDataSegment( DataSegment pushedDataSegment = pusher.uploadDataSegment(
dataSegment, dataSegment,
version, binaryVersion,
0, // empty file 0, // empty file
compressedSegmentData, compressedSegmentData,
descriptorFile, descriptorFile,
@ -151,11 +151,28 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
); );
assertEquals(compressedSegmentData.length(), pushedDataSegment.getSize()); assertEquals(compressedSegmentData.length(), pushedDataSegment.getSize());
assertEquals(version, (int) pushedDataSegment.getBinaryVersion()); assertEquals(binaryVersion, (int) pushedDataSegment.getBinaryVersion());
Map<String, Object> loadSpec = pushedDataSegment.getLoadSpec(); Map<String, Object> loadSpec = pushedDataSegment.getLoadSpec();
assertEquals(AzureStorageDruidModule.SCHEME, MapUtils.getString(loadSpec, "type")); assertEquals(AzureStorageDruidModule.SCHEME, MapUtils.getString(loadSpec, "type"));
assertEquals(azurePaths.get("index"), MapUtils.getString(loadSpec, "blobPath")); assertEquals(azurePaths.get("index"), MapUtils.getString(loadSpec, "blobPath"));
verifyAll(); verifyAll();
} }
@Test
public void getPathForHadoopTest()
{
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, jsonMapper);
String hadoopPath = pusher.getPathForHadoop();
Assert.assertEquals("wasbs://container@account.blob.core.windows.net/", hadoopPath);
}
@Test
public void storageDirContainsNoColonsTest()
{
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, jsonMapper);
DataSegment withColons = dataSegment.withVersion("2018-01-05T14:54:09.295Z");
String segmentPath = pusher.getStorageDir(withColons);
Assert.assertFalse("Path should not contain any columns", segmentPath.contains(":"));
}
} }