mirror of https://github.com/apache/druid.git
Overall improvement on Azure Deep Storage extension.
* Remove hard-coded azure path manipulation from the puller. * Fix segment size not being zero after uploading it do Azure. * Remove both index and desc files only on a success upload to Azure. * Add Azure container name to load spec. This patch would help future-proof azure deep-storage module and avoid having to introduce ugly backwards-compatibility fixes when we want to support multiple containers or moving data between containers.
This commit is contained in:
parent
7c4054aaa3
commit
11a76169b4
|
@ -28,23 +28,26 @@ import java.net.URISyntaxException;
|
|||
public class AzureByteSource extends ByteSource
|
||||
{
|
||||
|
||||
final private AzureStorageContainer azureStorageContainer;
|
||||
final private String filePath;
|
||||
final private AzureStorage azureStorage;
|
||||
final private String containerName;
|
||||
final private String blobPath;
|
||||
|
||||
public AzureByteSource(
|
||||
AzureStorageContainer azureStorageContainer,
|
||||
String filePath
|
||||
AzureStorage azureStorage,
|
||||
String containerName,
|
||||
String blobPath
|
||||
)
|
||||
{
|
||||
this.azureStorageContainer = azureStorageContainer;
|
||||
this.filePath = filePath;
|
||||
this.azureStorage = azureStorage;
|
||||
this.containerName = containerName;
|
||||
this.blobPath = blobPath;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream openStream() throws IOException
|
||||
{
|
||||
try {
|
||||
return azureStorageContainer.getBlobInputStream(filePath);
|
||||
return azureStorage.getBlobInputStream(containerName, blobPath);
|
||||
}
|
||||
catch (StorageException | URISyntaxException e) {
|
||||
if (AzureUtils.AZURE_RETRY.apply(e)) {
|
||||
|
|
|
@ -26,20 +26,21 @@ import io.druid.segment.loading.SegmentLoadingException;
|
|||
import io.druid.timeline.DataSegment;
|
||||
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Map;
|
||||
|
||||
public class AzureDataSegmentKiller implements DataSegmentKiller
|
||||
{
|
||||
private static final Logger log = new Logger(AzureDataSegmentKiller.class);
|
||||
|
||||
private final AzureStorageContainer azureStorageContainer;
|
||||
private final AzureStorage azureStorage;
|
||||
|
||||
@Inject
|
||||
public AzureDataSegmentKiller(
|
||||
final AzureStorageContainer azureStorageContainer
|
||||
final AzureStorage azureStorage
|
||||
)
|
||||
{
|
||||
this.azureStorageContainer = azureStorageContainer;
|
||||
this.azureStorage = azureStorage;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -48,10 +49,12 @@ public class AzureDataSegmentKiller implements DataSegmentKiller
|
|||
log.info("Killing segment [%s]", segment);
|
||||
|
||||
Map<String, Object> loadSpec = segment.getLoadSpec();
|
||||
String storageDir = MapUtils.getString(loadSpec, "storageDir");
|
||||
final String containerName = MapUtils.getString(loadSpec, "containerName");
|
||||
final String blobPath = MapUtils.getString(loadSpec, "blobPath");
|
||||
final String dirPath = Paths.get(blobPath).getParent().toString();
|
||||
|
||||
try {
|
||||
azureStorageContainer.emptyCloudBlobDirectory(storageDir);
|
||||
azureStorage.emptyCloudBlobDirectory(containerName, dirPath);
|
||||
}
|
||||
catch (StorageException | URISyntaxException e) {
|
||||
throw new SegmentLoadingException(e, "Couldn't kill segment[%s]", segment.getIdentifier());
|
||||
|
|
|
@ -36,25 +36,28 @@ public class AzureDataSegmentPuller implements DataSegmentPuller
|
|||
{
|
||||
private static final Logger log = new Logger(AzureDataSegmentPuller.class);
|
||||
|
||||
private final AzureStorageContainer azureStorageContainer;
|
||||
private final AzureStorage azureStorage;
|
||||
|
||||
@Inject
|
||||
public AzureDataSegmentPuller(
|
||||
AzureStorageContainer azureStorageContainer
|
||||
AzureStorage azureStorage
|
||||
)
|
||||
{
|
||||
this.azureStorageContainer = azureStorageContainer;
|
||||
this.azureStorage = azureStorage;
|
||||
}
|
||||
|
||||
public com.metamx.common.FileUtils.FileCopyResult getSegmentFiles(final String storageDir, final File outDir)
|
||||
public com.metamx.common.FileUtils.FileCopyResult getSegmentFiles(
|
||||
final String containerName,
|
||||
final String blobPath,
|
||||
final File outDir
|
||||
)
|
||||
throws SegmentLoadingException
|
||||
{
|
||||
prepareOutDir(outDir);
|
||||
|
||||
try {
|
||||
|
||||
final String filePath = String.format("%s/%s", storageDir, AzureStorageDruidModule.INDEX_ZIP_FILE_NAME);
|
||||
final ByteSource byteSource = new AzureByteSource(azureStorageContainer, filePath);
|
||||
final ByteSource byteSource = new AzureByteSource(azureStorage, containerName, blobPath);
|
||||
final com.metamx.common.FileUtils.FileCopyResult result = CompressionUtils.unzip(
|
||||
byteSource,
|
||||
outDir,
|
||||
|
@ -62,7 +65,7 @@ public class AzureDataSegmentPuller implements DataSegmentPuller
|
|||
true
|
||||
);
|
||||
|
||||
log.info("Loaded %d bytes from [%s] to [%s]", result.size(), filePath, outDir.getAbsolutePath());
|
||||
log.info("Loaded %d bytes from [%s] to [%s]", result.size(), blobPath, outDir.getAbsolutePath());
|
||||
return result;
|
||||
}
|
||||
catch (IOException e) {
|
||||
|
@ -74,7 +77,7 @@ public class AzureDataSegmentPuller implements DataSegmentPuller
|
|||
ioe,
|
||||
"Failed to remove output directory [%s] for segment pulled from [%s]",
|
||||
outDir.getAbsolutePath(),
|
||||
storageDir
|
||||
blobPath
|
||||
);
|
||||
}
|
||||
throw new SegmentLoadingException(e, e.getMessage());
|
||||
|
@ -87,9 +90,10 @@ public class AzureDataSegmentPuller implements DataSegmentPuller
|
|||
{
|
||||
|
||||
final Map<String, Object> loadSpec = segment.getLoadSpec();
|
||||
final String storageDir = MapUtils.getString(loadSpec, "storageDir");
|
||||
final String containerName = MapUtils.getString(loadSpec, "containerName");
|
||||
final String blobPath = MapUtils.getString(loadSpec, "blobPath");
|
||||
|
||||
getSegmentFiles(storageDir, outDir);
|
||||
getSegmentFiles(containerName, blobPath, outDir);
|
||||
}
|
||||
|
||||
public void prepareOutDir(final File outDir) throws ISE
|
||||
|
|
|
@ -41,19 +41,19 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
|
|||
{
|
||||
|
||||
private static final Logger log = new Logger(AzureDataSegmentPusher.class);
|
||||
private final AzureStorageContainer azureStorageContainer;
|
||||
private final AzureAccountConfig azureAccountConfig;
|
||||
private final AzureStorage azureStorage;
|
||||
private final AzureAccountConfig config;
|
||||
private final ObjectMapper jsonMapper;
|
||||
|
||||
@Inject
|
||||
public AzureDataSegmentPusher(
|
||||
AzureStorageContainer azureStorageContainer,
|
||||
AzureAccountConfig azureAccountConfig,
|
||||
AzureStorage azureStorage,
|
||||
AzureAccountConfig config,
|
||||
ObjectMapper jsonMapper
|
||||
)
|
||||
{
|
||||
this.azureStorageContainer = azureStorageContainer;
|
||||
this.azureAccountConfig = azureAccountConfig;
|
||||
this.azureStorage = azureStorage;
|
||||
this.config = config;
|
||||
this.jsonMapper = jsonMapper;
|
||||
}
|
||||
|
||||
|
@ -88,7 +88,6 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
|
|||
final String storageDir = DataSegmentPusherUtil.getStorageDir(segment);
|
||||
|
||||
return ImmutableMap.of(
|
||||
"storage", storageDir,
|
||||
"index", String.format("%s/%s", storageDir, AzureStorageDruidModule.INDEX_ZIP_FILE_NAME),
|
||||
"descriptor", String.format("%s/%s", storageDir, AzureStorageDruidModule.DESCRIPTOR_FILE_NAME)
|
||||
);
|
||||
|
@ -100,14 +99,6 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
|
|||
return RetryUtils.retry(f, AzureUtils.AZURE_RETRY, maxTries);
|
||||
}
|
||||
|
||||
public void uploadThenDelete(final File file, final String azurePath)
|
||||
throws StorageException, IOException, URISyntaxException
|
||||
{
|
||||
azureStorageContainer.uploadBlob(file, azurePath);
|
||||
log.info("Deleting file [%s]", file);
|
||||
file.delete();
|
||||
}
|
||||
|
||||
public DataSegment uploadDataSegment(
|
||||
DataSegment segment,
|
||||
final int version,
|
||||
|
@ -117,20 +108,30 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
|
|||
)
|
||||
throws StorageException, IOException, URISyntaxException
|
||||
{
|
||||
uploadThenDelete(compressedSegmentData, azurePaths.get("index"));
|
||||
uploadThenDelete(descriptorFile, azurePaths.get("descriptor"));
|
||||
azureStorage.uploadBlob(compressedSegmentData, config.getContainer(), azurePaths.get("index"));
|
||||
azureStorage.uploadBlob(descriptorFile, config.getContainer(), azurePaths.get("descriptor"));
|
||||
|
||||
return segment
|
||||
final DataSegment outSegment = segment
|
||||
.withSize(compressedSegmentData.length())
|
||||
.withLoadSpec(
|
||||
ImmutableMap.<String, Object>of(
|
||||
"type",
|
||||
AzureStorageDruidModule.SCHEME,
|
||||
"storageDir",
|
||||
azurePaths.get("storage")
|
||||
"containerName",
|
||||
config.getContainer(),
|
||||
"blobPath",
|
||||
azurePaths.get("index")
|
||||
)
|
||||
)
|
||||
.withBinaryVersion(version);
|
||||
|
||||
log.info("Deleting file [%s]", compressedSegmentData);
|
||||
compressedSegmentData.delete();
|
||||
|
||||
log.info("Deleting file [%s]", descriptorFile);
|
||||
descriptorFile.delete();
|
||||
|
||||
return outSegment;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -154,7 +155,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
|
|||
return uploadDataSegment(segment, version, compressedSegmentData, descriptorFile, azurePaths);
|
||||
}
|
||||
},
|
||||
azureAccountConfig.getMaxTries()
|
||||
config.getMaxTries()
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
|
|
|
@ -30,25 +30,32 @@ import java.io.File;
|
|||
@JsonTypeName(AzureStorageDruidModule.SCHEME)
|
||||
public class AzureLoadSpec implements LoadSpec
|
||||
{
|
||||
|
||||
@JsonProperty
|
||||
private final String storageDir;
|
||||
private final String containerName;
|
||||
|
||||
@JsonProperty
|
||||
private final String blobPath;
|
||||
|
||||
private final AzureDataSegmentPuller puller;
|
||||
|
||||
@JsonCreator
|
||||
public AzureLoadSpec(
|
||||
@JacksonInject AzureDataSegmentPuller puller,
|
||||
@JsonProperty("storageDir") String storageDir
|
||||
@JsonProperty("containerName") String containerName,
|
||||
@JsonProperty("blobPath") String blobPath,
|
||||
@JacksonInject AzureDataSegmentPuller puller
|
||||
)
|
||||
{
|
||||
Preconditions.checkNotNull(storageDir);
|
||||
this.storageDir = storageDir;
|
||||
Preconditions.checkNotNull(blobPath);
|
||||
Preconditions.checkNotNull(containerName);
|
||||
this.containerName = containerName;
|
||||
this.blobPath = blobPath;
|
||||
this.puller = puller;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LoadSpecResult loadSegment(File file) throws SegmentLoadingException
|
||||
{
|
||||
return new LoadSpecResult(puller.getSegmentFiles(storageDir, file).size());
|
||||
return new LoadSpecResult(puller.getSegmentFiles(containerName, blobPath, file).size());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,8 +20,8 @@ package io.druid.storage.azure;
|
|||
import com.metamx.common.logger.Logger;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import com.microsoft.azure.storage.blob.CloudBlob;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobClient;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobContainer;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobDirectory;
|
||||
import com.microsoft.azure.storage.blob.CloudBlockBlob;
|
||||
import com.microsoft.azure.storage.blob.ListBlobItem;
|
||||
|
||||
|
@ -33,27 +33,36 @@ import java.net.URISyntaxException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class AzureStorageContainer
|
||||
public class AzureStorage
|
||||
{
|
||||
|
||||
private final Logger log = new Logger(AzureStorageContainer.class);
|
||||
private final Logger log = new Logger(AzureStorage.class);
|
||||
|
||||
private final CloudBlobContainer cloudBlobContainer;
|
||||
private final CloudBlobClient cloudBlobClient;
|
||||
|
||||
public AzureStorageContainer(
|
||||
CloudBlobContainer cloudBlobContainer
|
||||
public AzureStorage(
|
||||
CloudBlobClient cloudBlobClient
|
||||
)
|
||||
{
|
||||
this.cloudBlobContainer = cloudBlobContainer;
|
||||
this.cloudBlobClient = cloudBlobClient;
|
||||
}
|
||||
|
||||
public List<String> emptyCloudBlobDirectory(final String path)
|
||||
public CloudBlobContainer getCloudBlobContainer(final String containerName)
|
||||
throws StorageException, URISyntaxException
|
||||
{
|
||||
CloudBlobContainer cloudBlobContainer = cloudBlobClient.getContainerReference(containerName);
|
||||
cloudBlobContainer.createIfNotExists();
|
||||
|
||||
return cloudBlobContainer;
|
||||
}
|
||||
|
||||
public List<String> emptyCloudBlobDirectory(final String containerName, final String virtualDirPath)
|
||||
throws StorageException, URISyntaxException
|
||||
{
|
||||
List<String> deletedFiles = new ArrayList<>();
|
||||
CloudBlobContainer container = getCloudBlobContainer(containerName);
|
||||
|
||||
for (ListBlobItem blobItem : cloudBlobContainer.listBlobs(path, true, null, null, null))
|
||||
{
|
||||
for (ListBlobItem blobItem : container.listBlobs(virtualDirPath, true, null, null, null)) {
|
||||
CloudBlob cloudBlob = (CloudBlob) blobItem;
|
||||
log.info("Removing file[%s] from Azure.", cloudBlob.getName());
|
||||
if (cloudBlob.deleteIfExists()) {
|
||||
|
@ -61,27 +70,30 @@ public class AzureStorageContainer
|
|||
}
|
||||
}
|
||||
|
||||
if (deletedFiles.isEmpty())
|
||||
{
|
||||
log.warn("No files were deleted on the following Azure path: [%s]", path);
|
||||
if (deletedFiles.isEmpty()) {
|
||||
log.warn("No files were deleted on the following Azure path: [%s]", virtualDirPath);
|
||||
}
|
||||
|
||||
return deletedFiles;
|
||||
|
||||
}
|
||||
|
||||
public void uploadBlob(final File file, final String destination)
|
||||
public void uploadBlob(final File file, final String containerName, final String blobPath)
|
||||
throws IOException, StorageException, URISyntaxException
|
||||
|
||||
{
|
||||
CloudBlobContainer container = getCloudBlobContainer(containerName);
|
||||
try (FileInputStream stream = new FileInputStream(file)) {
|
||||
CloudBlockBlob blob = cloudBlobContainer.getBlockBlobReference(destination);
|
||||
CloudBlockBlob blob = container.getBlockBlobReference(blobPath);
|
||||
blob.upload(stream, file.length());
|
||||
}
|
||||
}
|
||||
|
||||
public InputStream getBlobInputStream(final String filePath) throws URISyntaxException, StorageException
|
||||
public InputStream getBlobInputStream(final String containerName, final String blobPath)
|
||||
throws URISyntaxException, StorageException
|
||||
{
|
||||
return cloudBlobContainer.getBlockBlobReference(filePath).openInputStream();
|
||||
CloudBlobContainer container = getCloudBlobContainer(containerName);
|
||||
return container.getBlockBlobReference(blobPath).openInputStream();
|
||||
}
|
||||
|
||||
}
|
|
@ -23,9 +23,7 @@ import com.google.common.collect.ImmutableList;
|
|||
import com.google.inject.Binder;
|
||||
import com.google.inject.Provides;
|
||||
import com.microsoft.azure.storage.CloudStorageAccount;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobClient;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobContainer;
|
||||
import io.druid.guice.Binders;
|
||||
import io.druid.guice.JsonConfigProvider;
|
||||
import io.druid.guice.LazySingleton;
|
||||
|
@ -90,10 +88,10 @@ public class AzureStorageDruidModule implements DruidModule
|
|||
|
||||
@Provides
|
||||
@LazySingleton
|
||||
public CloudStorageAccount getCloudStorageAccount(final AzureAccountConfig config)
|
||||
public CloudBlobClient getCloudBlobClient(final AzureAccountConfig config)
|
||||
throws URISyntaxException, InvalidKeyException
|
||||
{
|
||||
return CloudStorageAccount.parse(
|
||||
CloudStorageAccount account = CloudStorageAccount.parse(
|
||||
String.format(
|
||||
STORAGE_CONNECTION_STRING,
|
||||
config.getProtocol(),
|
||||
|
@ -101,30 +99,16 @@ public class AzureStorageDruidModule implements DruidModule
|
|||
config.getKey()
|
||||
)
|
||||
);
|
||||
|
||||
return account.createCloudBlobClient();
|
||||
}
|
||||
|
||||
@Provides
|
||||
@LazySingleton
|
||||
public CloudBlobContainer getCloudBlobContainer(
|
||||
final AzureAccountConfig config,
|
||||
final CloudStorageAccount cloudStorageAccount
|
||||
)
|
||||
throws URISyntaxException, StorageException
|
||||
{
|
||||
CloudBlobClient blobClient = cloudStorageAccount.createCloudBlobClient();
|
||||
CloudBlobContainer cloudBlobContainer = blobClient.getContainerReference(config.getContainer());
|
||||
|
||||
cloudBlobContainer.createIfNotExists();
|
||||
|
||||
return cloudBlobContainer;
|
||||
}
|
||||
|
||||
@Provides
|
||||
@LazySingleton
|
||||
public AzureStorageContainer getAzureStorageContainer(
|
||||
final CloudBlobContainer cloudBlobContainer
|
||||
public AzureStorage getAzureStorageContainer(
|
||||
final CloudBlobClient cloudBlobClient
|
||||
)
|
||||
{
|
||||
return new AzureStorageContainer(cloudBlobContainer);
|
||||
return new AzureStorage(cloudBlobClient);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,18 +29,20 @@ import static org.easymock.EasyMock.*;
|
|||
|
||||
public class AzureByteSourceTest extends EasyMockSupport
|
||||
{
|
||||
|
||||
@Test
|
||||
public void openStreamTest() throws IOException, URISyntaxException, StorageException
|
||||
{
|
||||
final String filePath = "/path/to/file";
|
||||
AzureStorageContainer azureStorageContainer = createMock(AzureStorageContainer.class);
|
||||
final String containerName = "container";
|
||||
final String blobPath = "/path/to/file";
|
||||
AzureStorage azureStorage = createMock(AzureStorage.class);
|
||||
InputStream stream = createMock(InputStream.class);
|
||||
|
||||
expect(azureStorageContainer.getBlobInputStream(filePath)).andReturn(stream);
|
||||
expect(azureStorage.getBlobInputStream(containerName, blobPath)).andReturn(stream);
|
||||
|
||||
replayAll();
|
||||
|
||||
AzureByteSource byteSource = new AzureByteSource(azureStorageContainer, filePath);
|
||||
AzureByteSource byteSource = new AzureByteSource(azureStorage, containerName, blobPath);
|
||||
|
||||
byteSource.openStream();
|
||||
|
||||
|
@ -50,14 +52,23 @@ public class AzureByteSourceTest extends EasyMockSupport
|
|||
@Test(expected = IOException.class)
|
||||
public void openStreamWithRecoverableErrorTest() throws URISyntaxException, StorageException, IOException
|
||||
{
|
||||
final String filePath = "/path/to/file";
|
||||
AzureStorageContainer azureStorageContainer = createMock(AzureStorageContainer.class);
|
||||
final String containerName = "container";
|
||||
final String blobPath = "/path/to/file";
|
||||
AzureStorage azureStorage = createMock(AzureStorage.class);
|
||||
|
||||
expect(azureStorageContainer.getBlobInputStream(filePath)).andThrow(new StorageException("", "", 500, null, null));
|
||||
expect(azureStorage.getBlobInputStream(containerName, blobPath)).andThrow(
|
||||
new StorageException(
|
||||
"",
|
||||
"",
|
||||
500,
|
||||
null,
|
||||
null
|
||||
)
|
||||
);
|
||||
|
||||
replayAll();
|
||||
|
||||
AzureByteSource byteSource = new AzureByteSource(azureStorageContainer, filePath);
|
||||
AzureByteSource byteSource = new AzureByteSource(azureStorage, containerName, blobPath);
|
||||
|
||||
byteSource.openStream();
|
||||
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.junit.Before;
|
|||
import org.junit.Test;
|
||||
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -36,11 +37,14 @@ import static org.easymock.EasyMock.*;
|
|||
public class AzureDataSegmentKillerTest extends EasyMockSupport
|
||||
{
|
||||
|
||||
private static final String containerName = "container";
|
||||
private static final String blobPath = "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip";
|
||||
|
||||
private static final DataSegment dataSegment = new DataSegment(
|
||||
"test",
|
||||
new Interval("2015-04-12/2015-04-13"),
|
||||
"1",
|
||||
ImmutableMap.<String, Object>of("storageDir", "/path/to/storage/"),
|
||||
ImmutableMap.<String, Object>of("containerName", containerName, "blobPath", blobPath),
|
||||
null,
|
||||
null,
|
||||
new NoneShardSpec(),
|
||||
|
@ -48,12 +52,12 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
|
|||
1
|
||||
);
|
||||
|
||||
private AzureStorageContainer azureStorageContainer;
|
||||
private AzureStorage azureStorage;
|
||||
|
||||
@Before
|
||||
public void before()
|
||||
{
|
||||
azureStorageContainer = createMock(AzureStorageContainer.class);
|
||||
azureStorage = createMock(AzureStorage.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -61,12 +65,13 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
|
|||
{
|
||||
|
||||
List<String> deletedFiles = new ArrayList<>();
|
||||
final String dirPath = Paths.get(blobPath).getParent().toString();
|
||||
|
||||
expect(azureStorageContainer.emptyCloudBlobDirectory("/path/to/storage/")).andReturn(deletedFiles);
|
||||
expect(azureStorage.emptyCloudBlobDirectory(containerName, dirPath)).andReturn(deletedFiles);
|
||||
|
||||
replayAll();
|
||||
|
||||
AzureDataSegmentKiller killer = new AzureDataSegmentKiller(azureStorageContainer);
|
||||
AzureDataSegmentKiller killer = new AzureDataSegmentKiller(azureStorage);
|
||||
|
||||
killer.kill(dataSegment);
|
||||
|
||||
|
@ -77,7 +82,9 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
|
|||
public void killWithErrorTest() throws SegmentLoadingException, URISyntaxException, StorageException
|
||||
{
|
||||
|
||||
expect(azureStorageContainer.emptyCloudBlobDirectory("/path/to/storage/")).andThrow(
|
||||
String dirPath = Paths.get(blobPath).getParent().toString();
|
||||
|
||||
expect(azureStorage.emptyCloudBlobDirectory(containerName, dirPath)).andThrow(
|
||||
new StorageException(
|
||||
"",
|
||||
"",
|
||||
|
@ -89,7 +96,7 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
|
|||
|
||||
replayAll();
|
||||
|
||||
AzureDataSegmentKiller killer = new AzureDataSegmentKiller(azureStorageContainer);
|
||||
AzureDataSegmentKiller killer = new AzureDataSegmentKiller(azureStorage);
|
||||
|
||||
killer.kill(dataSegment);
|
||||
|
||||
|
|
|
@ -19,7 +19,6 @@ package io.druid.storage.azure;
|
|||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.metamx.common.FileUtils;
|
||||
import com.metamx.common.MapUtils;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import io.druid.segment.loading.SegmentLoadingException;
|
||||
import io.druid.timeline.DataSegment;
|
||||
|
@ -35,7 +34,6 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.Files;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.easymock.EasyMock.*;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
@ -45,13 +43,15 @@ import static org.junit.Assert.assertTrue;
|
|||
public class AzureDataSegmentPullerTest extends EasyMockSupport
|
||||
{
|
||||
|
||||
private AzureStorageContainer azureStorageContainer;
|
||||
private AzureStorage azureStorage;
|
||||
private static final String SEGMENT_FILE_NAME = "segment";
|
||||
private static final String containerName = "container";
|
||||
private static final String blobPath = "/path/to/storage/index.zip";
|
||||
private static final DataSegment dataSegment = new DataSegment(
|
||||
"test",
|
||||
new Interval("2015-04-12/2015-04-13"),
|
||||
"1",
|
||||
ImmutableMap.<String, Object>of("storageDir", "/path/to/storage/"),
|
||||
ImmutableMap.<String, Object>of("containerName", containerName, "blobPath", blobPath),
|
||||
null,
|
||||
null,
|
||||
new NoneShardSpec(),
|
||||
|
@ -62,7 +62,7 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport
|
|||
@Before
|
||||
public void before()
|
||||
{
|
||||
azureStorageContainer = createMock(AzureStorageContainer.class);
|
||||
azureStorage = createMock(AzureStorage.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -73,16 +73,15 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport
|
|||
pulledFile.deleteOnExit();
|
||||
final File toDir = Files.createTempDirectory("druid").toFile();
|
||||
toDir.deleteOnExit();
|
||||
final File zipFilePath = new File(pulledFile.getParent(), AzureStorageDruidModule.INDEX_ZIP_FILE_NAME);
|
||||
final InputStream zipStream = new FileInputStream(pulledFile);
|
||||
|
||||
expect(azureStorageContainer.getBlobInputStream(zipFilePath.getAbsolutePath())).andReturn(zipStream);
|
||||
expect(azureStorage.getBlobInputStream(containerName, blobPath)).andReturn(zipStream);
|
||||
|
||||
replayAll();
|
||||
|
||||
AzureDataSegmentPuller puller = new AzureDataSegmentPuller(azureStorageContainer);
|
||||
AzureDataSegmentPuller puller = new AzureDataSegmentPuller(azureStorage);
|
||||
|
||||
FileUtils.FileCopyResult result = puller.getSegmentFiles(pulledFile.getParent(), toDir);
|
||||
FileUtils.FileCopyResult result = puller.getSegmentFiles(containerName, blobPath, toDir);
|
||||
|
||||
File expected = new File(toDir, SEGMENT_FILE_NAME);
|
||||
assertEquals(value.length(), result.size());
|
||||
|
@ -97,14 +96,10 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport
|
|||
throws IOException, URISyntaxException, StorageException, SegmentLoadingException
|
||||
{
|
||||
|
||||
final File fromDir = Files.createTempDirectory("druidA").toFile();
|
||||
fromDir.deleteOnExit();
|
||||
final File outDir = Files.createTempDirectory("druid").toFile();
|
||||
outDir.deleteOnExit();
|
||||
|
||||
final File zipFilePath = new File(fromDir.getAbsolutePath(), AzureStorageDruidModule.INDEX_ZIP_FILE_NAME);
|
||||
|
||||
expect(azureStorageContainer.getBlobInputStream(zipFilePath.getAbsolutePath())).andThrow(
|
||||
expect(azureStorage.getBlobInputStream(containerName, blobPath)).andThrow(
|
||||
new StorageException(
|
||||
"error",
|
||||
"error",
|
||||
|
@ -116,9 +111,9 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport
|
|||
|
||||
replayAll();
|
||||
|
||||
AzureDataSegmentPuller puller = new AzureDataSegmentPuller(azureStorageContainer);
|
||||
AzureDataSegmentPuller puller = new AzureDataSegmentPuller(azureStorage);
|
||||
|
||||
puller.getSegmentFiles(fromDir.getAbsolutePath(), outDir);
|
||||
puller.getSegmentFiles(containerName, blobPath, outDir);
|
||||
|
||||
assertFalse(outDir.exists());
|
||||
|
||||
|
@ -130,14 +125,12 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport
|
|||
public void getSegmentFilesTest() throws SegmentLoadingException
|
||||
{
|
||||
final File outDir = new File("");
|
||||
final Map<String, Object> loadSpec = dataSegment.getLoadSpec();
|
||||
final String storageDir = MapUtils.getString(loadSpec, "storageDir");
|
||||
final FileUtils.FileCopyResult result = createMock(FileUtils.FileCopyResult.class);
|
||||
final AzureDataSegmentPuller puller = createMockBuilder(AzureDataSegmentPuller.class).withConstructor(
|
||||
azureStorageContainer
|
||||
).addMockedMethod("getSegmentFiles", String.class, File.class).createMock();
|
||||
azureStorage
|
||||
).addMockedMethod("getSegmentFiles", String.class, String.class, File.class).createMock();
|
||||
|
||||
expect(puller.getSegmentFiles(storageDir, outDir)).andReturn(result);
|
||||
expect(puller.getSegmentFiles(containerName, blobPath, outDir)).andReturn(result);
|
||||
|
||||
replayAll();
|
||||
|
||||
|
@ -153,7 +146,7 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport
|
|||
File outDir = Files.createTempDirectory("druid").toFile();
|
||||
|
||||
try {
|
||||
AzureDataSegmentPuller puller = new AzureDataSegmentPuller(azureStorageContainer);
|
||||
AzureDataSegmentPuller puller = new AzureDataSegmentPuller(azureStorage);
|
||||
puller.prepareOutDir(outDir);
|
||||
|
||||
assertTrue(outDir.exists());
|
||||
|
|
|
@ -32,9 +32,7 @@ import org.junit.Test;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.Files;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
|
@ -42,11 +40,13 @@ import static org.easymock.EasyMock.*;
|
|||
|
||||
public class AzureDataSegmentPusherTest extends EasyMockSupport
|
||||
{
|
||||
private static final String containerName = "container";
|
||||
private static final String blobPath = "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip";
|
||||
private static final DataSegment dataSegment = new DataSegment(
|
||||
"test",
|
||||
new Interval("2015-04-12/2015-04-13"),
|
||||
"1",
|
||||
ImmutableMap.<String, Object>of("storageDir", "/path/to/storage/"),
|
||||
ImmutableMap.<String, Object>of("containerName", containerName, "blobPath", blobPath),
|
||||
null,
|
||||
null,
|
||||
new NoneShardSpec(),
|
||||
|
@ -54,14 +54,14 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
|
|||
1
|
||||
);
|
||||
|
||||
private AzureStorageContainer azureStorageContainer;
|
||||
private AzureStorage azureStorage;
|
||||
private AzureAccountConfig azureAccountConfig;
|
||||
private ObjectMapper jsonMapper;
|
||||
|
||||
@Before
|
||||
public void before()
|
||||
{
|
||||
azureStorageContainer = createMock(AzureStorageContainer.class);
|
||||
azureStorage = createMock(AzureStorage.class);
|
||||
azureAccountConfig = createMock(AzureAccountConfig.class);
|
||||
jsonMapper = createMock(ObjectMapper.class);
|
||||
|
||||
|
@ -71,11 +71,10 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
|
|||
public void getAzurePathsTest()
|
||||
{
|
||||
final String storageDir = DataSegmentPusherUtil.getStorageDir(dataSegment);
|
||||
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorageContainer, azureAccountConfig, jsonMapper);
|
||||
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, jsonMapper);
|
||||
|
||||
Map<String, String> paths = pusher.getAzurePaths(dataSegment);
|
||||
|
||||
assertEquals(storageDir, paths.get("storage"));
|
||||
assertEquals(String.format("%s/%s", storageDir, AzureStorageDruidModule.INDEX_ZIP_FILE_NAME), paths.get("index"));
|
||||
assertEquals(
|
||||
String.format("%s/%s", storageDir, AzureStorageDruidModule.DESCRIPTOR_FILE_NAME),
|
||||
|
@ -83,50 +82,23 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void uploadThenDeleteTest() throws StorageException, IOException, URISyntaxException
|
||||
{
|
||||
final File file = AzureTestUtils.createZipTempFile("segment", "bucket");
|
||||
file.deleteOnExit();
|
||||
final String azureDestPath = "/azure/path/";
|
||||
azureStorageContainer.uploadBlob(file, azureDestPath);
|
||||
expectLastCall();
|
||||
|
||||
replayAll();
|
||||
|
||||
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorageContainer, azureAccountConfig, jsonMapper);
|
||||
|
||||
pusher.uploadThenDelete(file, azureDestPath);
|
||||
|
||||
verifyAll();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void uploadDataSegmentTest() throws StorageException, IOException, URISyntaxException
|
||||
{
|
||||
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, jsonMapper);
|
||||
final int version = 9;
|
||||
final File compressedSegmentData = AzureTestUtils.createZipTempFile("segment", "bucket");
|
||||
compressedSegmentData.deleteOnExit();
|
||||
final File descriptorFile = Files.createTempFile("descriptor", ".json").toFile();
|
||||
descriptorFile.deleteOnExit();
|
||||
final Map<String, String> azurePaths = ImmutableMap.of(
|
||||
"index",
|
||||
"/path/to/azure/storage",
|
||||
"descriptor",
|
||||
"/path/to/azure/storage",
|
||||
"storage",
|
||||
"/path/to/azure/storage"
|
||||
);
|
||||
final File compressedSegmentData = new File("index.zip");
|
||||
final File descriptorFile = new File("descriptor.json");
|
||||
final Map<String, String> azurePaths = pusher.getAzurePaths(dataSegment);
|
||||
|
||||
azureStorageContainer.uploadBlob(compressedSegmentData, azurePaths.get("index"));
|
||||
expect(azureAccountConfig.getContainer()).andReturn(containerName).times(3);
|
||||
azureStorage.uploadBlob(compressedSegmentData, containerName, azurePaths.get("index"));
|
||||
expectLastCall();
|
||||
azureStorageContainer.uploadBlob(descriptorFile, azurePaths.get("descriptor"));
|
||||
azureStorage.uploadBlob(descriptorFile, containerName, azurePaths.get("descriptor"));
|
||||
expectLastCall();
|
||||
|
||||
replayAll();
|
||||
|
||||
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorageContainer, azureAccountConfig, jsonMapper);
|
||||
|
||||
DataSegment pushedDataSegment = pusher.uploadDataSegment(
|
||||
dataSegment,
|
||||
version,
|
||||
|
@ -139,23 +111,10 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
|
|||
assertEquals(version, (int) pushedDataSegment.getBinaryVersion());
|
||||
Map<String, Object> loadSpec = pushedDataSegment.getLoadSpec();
|
||||
assertEquals(AzureStorageDruidModule.SCHEME, MapUtils.getString(loadSpec, "type"));
|
||||
assertEquals(azurePaths.get("storage"), MapUtils.getString(loadSpec, "storageDir"));
|
||||
assertEquals(azurePaths.get("index"), MapUtils.getString(loadSpec, "blobPath"));
|
||||
|
||||
verifyAll();
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void t() throws IOException
|
||||
{
|
||||
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorageContainer, azureAccountConfig, jsonMapper);
|
||||
|
||||
File dir = Files.createTempDirectory("druid").toFile();
|
||||
dir.deleteOnExit();
|
||||
|
||||
File x = pusher.createCompressedSegmentDataFile(dir);
|
||||
|
||||
System.out.println(x);
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue