mirror of https://github.com/apache/druid.git
Merge pull request #2206 from druid-io/add-ds-test
add pusher tests for all deep storages
This commit is contained in:
commit
716a0e1457
|
@ -16,7 +16,8 @@
|
||||||
~ limitations under the License.
|
~ limitations under the License.
|
||||||
-->
|
-->
|
||||||
|
|
||||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
<groupId>io.druid.extensions</groupId>
|
<groupId>io.druid.extensions</groupId>
|
||||||
<artifactId>druid-azure-extensions</artifactId>
|
<artifactId>druid-azure-extensions</artifactId>
|
||||||
|
@ -36,11 +37,11 @@
|
||||||
<artifactId>druid-api</artifactId>
|
<artifactId>druid-api</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.microsoft.azure</groupId>
|
<groupId>com.microsoft.azure</groupId>
|
||||||
<artifactId>azure-storage</artifactId>
|
<artifactId>azure-storage</artifactId>
|
||||||
<version>2.1.0</version>
|
<version>2.1.0</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<!-- Tests -->
|
<!-- Tests -->
|
||||||
<dependency>
|
<dependency>
|
||||||
|
@ -48,6 +49,12 @@
|
||||||
<artifactId>junit</artifactId>
|
<artifactId>junit</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.druid</groupId>
|
||||||
|
<artifactId>druid-server</artifactId>
|
||||||
|
<version>${project.parent.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.easymock</groupId>
|
<groupId>org.easymock</groupId>
|
||||||
<artifactId>easymock</artifactId>
|
<artifactId>easymock</artifactId>
|
||||||
|
|
|
@ -45,6 +45,31 @@ public class AzureAccountConfig
|
||||||
@NotNull
|
@NotNull
|
||||||
private String container;
|
private String container;
|
||||||
|
|
||||||
|
public void setProtocol(String protocol)
|
||||||
|
{
|
||||||
|
this.protocol = protocol;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxTries(int maxTries)
|
||||||
|
{
|
||||||
|
this.maxTries = maxTries;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAccount(String account)
|
||||||
|
{
|
||||||
|
this.account = account;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setKey(String key)
|
||||||
|
{
|
||||||
|
this.key = key;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setContainer(String container)
|
||||||
|
{
|
||||||
|
this.container = container;
|
||||||
|
}
|
||||||
|
|
||||||
public String getProtocol() { return protocol; }
|
public String getProtocol() { return protocol; }
|
||||||
|
|
||||||
public int getMaxTries() { return maxTries; }
|
public int getMaxTries() { return maxTries; }
|
||||||
|
|
|
@ -64,15 +64,6 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public File createCompressedSegmentDataFile(final File indexFilesDir) throws IOException
|
|
||||||
{
|
|
||||||
final File zipOutFile = File.createTempFile("index", ".zip");
|
|
||||||
CompressionUtils.zip(indexFilesDir, zipOutFile);
|
|
||||||
|
|
||||||
return zipOutFile;
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public File createSegmentDescriptorFile(final ObjectMapper jsonMapper, final DataSegment segment) throws
|
public File createSegmentDescriptorFile(final ObjectMapper jsonMapper, final DataSegment segment) throws
|
||||||
IOException
|
IOException
|
||||||
{
|
{
|
||||||
|
@ -98,6 +89,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
|
||||||
public DataSegment uploadDataSegment(
|
public DataSegment uploadDataSegment(
|
||||||
DataSegment segment,
|
DataSegment segment,
|
||||||
final int version,
|
final int version,
|
||||||
|
final long size,
|
||||||
final File compressedSegmentData,
|
final File compressedSegmentData,
|
||||||
final File descriptorFile,
|
final File descriptorFile,
|
||||||
final Map<String, String> azurePaths
|
final Map<String, String> azurePaths
|
||||||
|
@ -108,7 +100,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
|
||||||
azureStorage.uploadBlob(descriptorFile, config.getContainer(), azurePaths.get("descriptor"));
|
azureStorage.uploadBlob(descriptorFile, config.getContainer(), azurePaths.get("descriptor"));
|
||||||
|
|
||||||
final DataSegment outSegment = segment
|
final DataSegment outSegment = segment
|
||||||
.withSize(compressedSegmentData.length())
|
.withSize(size)
|
||||||
.withLoadSpec(
|
.withLoadSpec(
|
||||||
ImmutableMap.<String, Object>of(
|
ImmutableMap.<String, Object>of(
|
||||||
"type",
|
"type",
|
||||||
|
@ -137,18 +129,23 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
|
||||||
log.info("Uploading [%s] to Azure.", indexFilesDir);
|
log.info("Uploading [%s] to Azure.", indexFilesDir);
|
||||||
|
|
||||||
final int version = SegmentUtils.getVersionFromDir(indexFilesDir);
|
final int version = SegmentUtils.getVersionFromDir(indexFilesDir);
|
||||||
final File compressedSegmentData = createCompressedSegmentDataFile(indexFilesDir);
|
File zipOutFile = null;
|
||||||
final File descriptorFile = createSegmentDescriptorFile(jsonMapper, segment);
|
File descriptorFile = null;
|
||||||
final Map<String, String> azurePaths = getAzurePaths(segment);
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
final File outFile = zipOutFile = File.createTempFile("index", ".zip");
|
||||||
|
final long size = CompressionUtils.zip(indexFilesDir, zipOutFile);
|
||||||
|
|
||||||
|
final File descFile = descriptorFile = createSegmentDescriptorFile(jsonMapper, segment);
|
||||||
|
final Map<String, String> azurePaths = getAzurePaths(segment);
|
||||||
|
|
||||||
return AzureUtils.retryAzureOperation(
|
return AzureUtils.retryAzureOperation(
|
||||||
new Callable<DataSegment>()
|
new Callable<DataSegment>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public DataSegment call() throws Exception
|
public DataSegment call() throws Exception
|
||||||
{
|
{
|
||||||
return uploadDataSegment(segment, version, compressedSegmentData, descriptorFile, azurePaths);
|
return uploadDataSegment(segment, version, size, outFile, descFile, azurePaths);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
config.getMaxTries()
|
config.getMaxTries()
|
||||||
|
@ -157,5 +154,16 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
throw Throwables.propagate(e);
|
throw Throwables.propagate(e);
|
||||||
}
|
}
|
||||||
|
finally {
|
||||||
|
if (zipOutFile != null) {
|
||||||
|
log.info("Deleting zipped index File[%s]", zipOutFile);
|
||||||
|
zipOutFile.delete();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (descriptorFile != null) {
|
||||||
|
log.info("Deleting descriptor file[%s]", descriptorFile);
|
||||||
|
descriptorFile.delete();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,27 +21,36 @@ package io.druid.storage.azure;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
import com.google.common.io.Files;
|
||||||
import com.metamx.common.MapUtils;
|
import com.metamx.common.MapUtils;
|
||||||
import com.microsoft.azure.storage.StorageException;
|
import com.microsoft.azure.storage.StorageException;
|
||||||
|
import io.druid.jackson.DefaultObjectMapper;
|
||||||
import io.druid.segment.loading.DataSegmentPusherUtil;
|
import io.druid.segment.loading.DataSegmentPusherUtil;
|
||||||
import io.druid.timeline.DataSegment;
|
import io.druid.timeline.DataSegment;
|
||||||
import io.druid.timeline.partition.NoneShardSpec;
|
import io.druid.timeline.partition.NoneShardSpec;
|
||||||
import org.easymock.EasyMockSupport;
|
import org.easymock.EasyMockSupport;
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.easymock.EasyMock.expect;
|
|
||||||
import static org.easymock.EasyMock.expectLastCall;
|
import static org.easymock.EasyMock.expectLastCall;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
public class AzureDataSegmentPusherTest extends EasyMockSupport
|
public class AzureDataSegmentPusherTest extends EasyMockSupport
|
||||||
{
|
{
|
||||||
|
@Rule
|
||||||
|
public final TemporaryFolder tempFolder = new TemporaryFolder();
|
||||||
|
|
||||||
private static final String containerName = "container";
|
private static final String containerName = "container";
|
||||||
private static final String blobPath = "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip";
|
private static final String blobPath = "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip";
|
||||||
private static final DataSegment dataSegment = new DataSegment(
|
private static final DataSegment dataSegment = new DataSegment(
|
||||||
|
@ -64,9 +73,40 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
|
||||||
public void before()
|
public void before()
|
||||||
{
|
{
|
||||||
azureStorage = createMock(AzureStorage.class);
|
azureStorage = createMock(AzureStorage.class);
|
||||||
azureAccountConfig = createMock(AzureAccountConfig.class);
|
azureAccountConfig = new AzureAccountConfig();
|
||||||
jsonMapper = createMock(ObjectMapper.class);
|
azureAccountConfig.setAccount("account");
|
||||||
|
azureAccountConfig.setContainer("container");
|
||||||
|
|
||||||
|
jsonMapper = new DefaultObjectMapper();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPush() throws Exception
|
||||||
|
{
|
||||||
|
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, jsonMapper);
|
||||||
|
|
||||||
|
// Create a mock segment on disk
|
||||||
|
File tmp = tempFolder.newFile("version.bin");
|
||||||
|
|
||||||
|
final byte[] data = new byte[]{0x0, 0x0, 0x0, 0x1};
|
||||||
|
Files.write(data, tmp);
|
||||||
|
final long size = data.length;
|
||||||
|
|
||||||
|
DataSegment segmentToPush = new DataSegment(
|
||||||
|
"foo",
|
||||||
|
new Interval("2015/2016"),
|
||||||
|
"0",
|
||||||
|
Maps.<String, Object>newHashMap(),
|
||||||
|
Lists.<String>newArrayList(),
|
||||||
|
Lists.<String>newArrayList(),
|
||||||
|
new NoneShardSpec(),
|
||||||
|
0,
|
||||||
|
size
|
||||||
|
);
|
||||||
|
|
||||||
|
DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush);
|
||||||
|
|
||||||
|
Assert.assertEquals(segmentToPush.getSize(), segment.getSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -93,7 +133,6 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
|
||||||
final File descriptorFile = new File("descriptor.json");
|
final File descriptorFile = new File("descriptor.json");
|
||||||
final Map<String, String> azurePaths = pusher.getAzurePaths(dataSegment);
|
final Map<String, String> azurePaths = pusher.getAzurePaths(dataSegment);
|
||||||
|
|
||||||
expect(azureAccountConfig.getContainer()).andReturn(containerName).times(3);
|
|
||||||
azureStorage.uploadBlob(compressedSegmentData, containerName, azurePaths.get("index"));
|
azureStorage.uploadBlob(compressedSegmentData, containerName, azurePaths.get("index"));
|
||||||
expectLastCall();
|
expectLastCall();
|
||||||
azureStorage.uploadBlob(descriptorFile, containerName, azurePaths.get("descriptor"));
|
azureStorage.uploadBlob(descriptorFile, containerName, azurePaths.get("descriptor"));
|
||||||
|
@ -104,6 +143,7 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
|
||||||
DataSegment pushedDataSegment = pusher.uploadDataSegment(
|
DataSegment pushedDataSegment = pusher.uploadDataSegment(
|
||||||
dataSegment,
|
dataSegment,
|
||||||
version,
|
version,
|
||||||
|
0, // empty file
|
||||||
compressedSegmentData,
|
compressedSegmentData,
|
||||||
descriptorFile,
|
descriptorFile,
|
||||||
azurePaths
|
azurePaths
|
||||||
|
@ -116,7 +156,5 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
|
||||||
assertEquals(azurePaths.get("index"), MapUtils.getString(loadSpec, "blobPath"));
|
assertEquals(azurePaths.get("index"), MapUtils.getString(loadSpec, "blobPath"));
|
||||||
|
|
||||||
verifyAll();
|
verifyAll();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,87 +20,93 @@
|
||||||
-->
|
-->
|
||||||
|
|
||||||
<project
|
<project
|
||||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
|
||||||
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
|
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
<groupId>io.druid.extensions</groupId>
|
<groupId>io.druid.extensions</groupId>
|
||||||
<artifactId>druid-cloudfiles-extensions</artifactId>
|
<artifactId>druid-cloudfiles-extensions</artifactId>
|
||||||
<name>druid-cloudfiles-extensions</name>
|
<name>druid-cloudfiles-extensions</name>
|
||||||
<description>druid-cloudfiles-extensions</description>
|
<description>druid-cloudfiles-extensions</description>
|
||||||
|
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>io.druid</groupId>
|
<groupId>io.druid</groupId>
|
||||||
<artifactId>druid</artifactId>
|
<artifactId>druid</artifactId>
|
||||||
<version>0.9.0-SNAPSHOT</version>
|
<version>0.9.0-SNAPSHOT</version>
|
||||||
<relativePath>../../pom.xml</relativePath>
|
<relativePath>../../pom.xml</relativePath>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||||
<jclouds.version>1.9.1</jclouds.version>
|
<jclouds.version>1.9.1</jclouds.version>
|
||||||
<!-- The version of guice is forced to 3.0 since JClouds 1.9.1 does not
|
<!-- The version of guice is forced to 3.0 since JClouds 1.9.1 does not
|
||||||
work with guice 4.0-beta -->
|
work with guice 4.0-beta -->
|
||||||
<guice.version>3.0</guice.version>
|
<guice.version>3.0</guice.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>io.druid</groupId>
|
<groupId>io.druid</groupId>
|
||||||
<artifactId>druid-api</artifactId>
|
<artifactId>druid-api</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.google.inject</groupId>
|
<groupId>com.google.inject</groupId>
|
||||||
<artifactId>guice</artifactId>
|
<artifactId>guice</artifactId>
|
||||||
<version>${guice.version}</version>
|
<version>${guice.version}</version>
|
||||||
<!--$NO-MVN-MAN-VER$ -->
|
<!--$NO-MVN-MAN-VER$ -->
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.google.inject.extensions</groupId>
|
<groupId>com.google.inject.extensions</groupId>
|
||||||
<artifactId>guice-servlet</artifactId>
|
<artifactId>guice-servlet</artifactId>
|
||||||
<version>${guice.version}</version>
|
<version>${guice.version}</version>
|
||||||
<!--$NO-MVN-MAN-VER$ -->
|
<!--$NO-MVN-MAN-VER$ -->
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.google.inject.extensions</groupId>
|
<groupId>com.google.inject.extensions</groupId>
|
||||||
<artifactId>guice-multibindings</artifactId>
|
<artifactId>guice-multibindings</artifactId>
|
||||||
<version>${guice.version}</version>
|
<version>${guice.version}</version>
|
||||||
<!--$NO-MVN-MAN-VER$ -->
|
<!--$NO-MVN-MAN-VER$ -->
|
||||||
</dependency>
|
</dependency>
|
||||||
<!-- jclouds dependencies -->
|
<!-- jclouds dependencies -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.jclouds.driver</groupId>
|
<groupId>org.apache.jclouds.driver</groupId>
|
||||||
<artifactId>jclouds-slf4j</artifactId>
|
<artifactId>jclouds-slf4j</artifactId>
|
||||||
<version>${jclouds.version}</version>
|
<version>${jclouds.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.jclouds.driver</groupId>
|
<groupId>org.apache.jclouds.driver</groupId>
|
||||||
<artifactId>jclouds-sshj</artifactId>
|
<artifactId>jclouds-sshj</artifactId>
|
||||||
<version>${jclouds.version}</version>
|
<version>${jclouds.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<!-- Rackspace US dependencies -->
|
<!-- Rackspace US dependencies -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.jclouds.provider</groupId>
|
<groupId>org.apache.jclouds.provider</groupId>
|
||||||
<artifactId>rackspace-cloudfiles-us</artifactId>
|
<artifactId>rackspace-cloudfiles-us</artifactId>
|
||||||
<version>${jclouds.version}</version>
|
<version>${jclouds.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<!-- Rackspace UK dependencies -->
|
<!-- Rackspace UK dependencies -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.jclouds.provider</groupId>
|
<groupId>org.apache.jclouds.provider</groupId>
|
||||||
<artifactId>rackspace-cloudfiles-uk</artifactId>
|
<artifactId>rackspace-cloudfiles-uk</artifactId>
|
||||||
<version>${jclouds.version}</version>
|
<version>${jclouds.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<!-- Tests -->
|
<!-- Tests -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>junit</groupId>
|
<groupId>junit</groupId>
|
||||||
<artifactId>junit</artifactId>
|
<artifactId>junit</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.easymock</groupId>
|
<groupId>org.easymock</groupId>
|
||||||
<artifactId>easymock</artifactId>
|
<artifactId>easymock</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
</dependencies>
|
<dependency>
|
||||||
|
<groupId>io.druid</groupId>
|
||||||
|
<artifactId>druid-server</artifactId>
|
||||||
|
<version>${project.parent.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
</project>
|
</project>
|
||||||
|
|
|
@ -68,13 +68,17 @@ public class CloudFilesDataSegmentPusher implements DataSegmentPusher
|
||||||
public DataSegment push(final File indexFilesDir, final DataSegment inSegment) throws IOException
|
public DataSegment push(final File indexFilesDir, final DataSegment inSegment) throws IOException
|
||||||
{
|
{
|
||||||
final String segmentPath = CloudFilesUtils.buildCloudFilesPath(this.config.getBasePath(), inSegment);
|
final String segmentPath = CloudFilesUtils.buildCloudFilesPath(this.config.getBasePath(), inSegment);
|
||||||
final File descriptorFile = File.createTempFile("descriptor", ".json");
|
|
||||||
final File zipOutFile = File.createTempFile("druid", "index.zip");
|
|
||||||
final long indexSize = CompressionUtils.zip(indexFilesDir, zipOutFile);
|
|
||||||
|
|
||||||
log.info("Copying segment[%s] to CloudFiles at location[%s]", inSegment.getIdentifier(), segmentPath);
|
File descriptorFile = null;
|
||||||
|
File zipOutFile = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
final File descFile = descriptorFile = File.createTempFile("descriptor", ".json");
|
||||||
|
final File outFile = zipOutFile = File.createTempFile("druid", "index.zip");
|
||||||
|
|
||||||
|
final long indexSize = CompressionUtils.zip(indexFilesDir, zipOutFile);
|
||||||
|
|
||||||
|
log.info("Copying segment[%s] to CloudFiles at location[%s]", inSegment.getIdentifier(), segmentPath);
|
||||||
return CloudFilesUtils.retryCloudFilesOperation(
|
return CloudFilesUtils.retryCloudFilesOperation(
|
||||||
new Callable<DataSegment>()
|
new Callable<DataSegment>()
|
||||||
{
|
{
|
||||||
|
@ -82,17 +86,17 @@ public class CloudFilesDataSegmentPusher implements DataSegmentPusher
|
||||||
public DataSegment call() throws Exception
|
public DataSegment call() throws Exception
|
||||||
{
|
{
|
||||||
CloudFilesObject segmentData = new CloudFilesObject(
|
CloudFilesObject segmentData = new CloudFilesObject(
|
||||||
segmentPath, zipOutFile, objectApi.getRegion(),
|
segmentPath, outFile, objectApi.getRegion(),
|
||||||
objectApi.getContainer()
|
objectApi.getContainer()
|
||||||
);
|
);
|
||||||
log.info("Pushing %s.", segmentData.getPath());
|
log.info("Pushing %s.", segmentData.getPath());
|
||||||
objectApi.put(segmentData);
|
objectApi.put(segmentData);
|
||||||
|
|
||||||
try (FileOutputStream stream = new FileOutputStream(descriptorFile)) {
|
try (FileOutputStream stream = new FileOutputStream(descFile)) {
|
||||||
stream.write(jsonMapper.writeValueAsBytes(inSegment));
|
stream.write(jsonMapper.writeValueAsBytes(inSegment));
|
||||||
}
|
}
|
||||||
CloudFilesObject descriptorData = new CloudFilesObject(
|
CloudFilesObject descriptorData = new CloudFilesObject(
|
||||||
segmentPath, descriptorFile,
|
segmentPath, descFile,
|
||||||
objectApi.getRegion(), objectApi.getContainer()
|
objectApi.getRegion(), objectApi.getContainer()
|
||||||
);
|
);
|
||||||
log.info("Pushing %s.", descriptorData.getPath());
|
log.info("Pushing %s.", descriptorData.getPath());
|
||||||
|
@ -123,11 +127,15 @@ public class CloudFilesDataSegmentPusher implements DataSegmentPusher
|
||||||
throw Throwables.propagate(e);
|
throw Throwables.propagate(e);
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
log.info("Deleting zipped index File[%s]", zipOutFile);
|
if (zipOutFile != null) {
|
||||||
zipOutFile.delete();
|
log.info("Deleting zipped index File[%s]", zipOutFile);
|
||||||
|
zipOutFile.delete();
|
||||||
|
}
|
||||||
|
|
||||||
log.info("Deleting descriptor file[%s]", descriptorFile);
|
if (descriptorFile != null) {
|
||||||
descriptorFile.delete();
|
log.info("Deleting descriptor file[%s]", descriptorFile);
|
||||||
|
descriptorFile.delete();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,7 +28,6 @@ import javax.validation.constraints.NotNull;
|
||||||
*/
|
*/
|
||||||
public class CloudFilesDataSegmentPusherConfig
|
public class CloudFilesDataSegmentPusherConfig
|
||||||
{
|
{
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
@NotNull
|
@NotNull
|
||||||
private String region;
|
private String region;
|
||||||
|
@ -44,6 +43,26 @@ public class CloudFilesDataSegmentPusherConfig
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
private int operationMaxRetries = 10;
|
private int operationMaxRetries = 10;
|
||||||
|
|
||||||
|
public void setRegion(String region)
|
||||||
|
{
|
||||||
|
this.region = region;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setContainer(String container)
|
||||||
|
{
|
||||||
|
this.container = container;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setBasePath(String basePath)
|
||||||
|
{
|
||||||
|
this.basePath = basePath;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setOperationMaxRetries(int operationMaxRetries)
|
||||||
|
{
|
||||||
|
this.operationMaxRetries = operationMaxRetries;
|
||||||
|
}
|
||||||
|
|
||||||
public String getRegion()
|
public String getRegion()
|
||||||
{
|
{
|
||||||
Preconditions.checkNotNull(region);
|
Preconditions.checkNotNull(region);
|
||||||
|
|
|
@ -0,0 +1,93 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.storage.cloudfiles;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
import com.google.common.io.Files;
|
||||||
|
import io.druid.jackson.DefaultObjectMapper;
|
||||||
|
import io.druid.timeline.DataSegment;
|
||||||
|
import io.druid.timeline.partition.NoneShardSpec;
|
||||||
|
import org.easymock.EasyMock;
|
||||||
|
import org.jclouds.io.Payload;
|
||||||
|
import org.jclouds.openstack.swift.v1.features.ObjectApi;
|
||||||
|
import org.jclouds.rackspace.cloudfiles.v1.CloudFilesApi;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class CloudFilesDataSegmentPusherTest
|
||||||
|
{
|
||||||
|
@Rule
|
||||||
|
public final TemporaryFolder tempFolder = new TemporaryFolder();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPush() throws Exception
|
||||||
|
{
|
||||||
|
ObjectApi objectApi = EasyMock.createStrictMock(ObjectApi.class);
|
||||||
|
EasyMock.expect(objectApi.put(EasyMock.anyString(), EasyMock.<Payload>anyObject())).andReturn(null).atLeastOnce();
|
||||||
|
EasyMock.replay(objectApi);
|
||||||
|
|
||||||
|
CloudFilesApi api = EasyMock.createStrictMock(CloudFilesApi.class);
|
||||||
|
EasyMock.expect(api.getObjectApi(EasyMock.anyString(), EasyMock.anyString()))
|
||||||
|
.andReturn(objectApi)
|
||||||
|
.atLeastOnce();
|
||||||
|
EasyMock.replay(api);
|
||||||
|
|
||||||
|
|
||||||
|
CloudFilesDataSegmentPusherConfig config = new CloudFilesDataSegmentPusherConfig();
|
||||||
|
config.setRegion("region");
|
||||||
|
config.setContainer("container");
|
||||||
|
config.setBasePath("basePath");
|
||||||
|
|
||||||
|
CloudFilesDataSegmentPusher pusher = new CloudFilesDataSegmentPusher(api, config, new DefaultObjectMapper());
|
||||||
|
|
||||||
|
// Create a mock segment on disk
|
||||||
|
File tmp = tempFolder.newFile("version.bin");
|
||||||
|
|
||||||
|
final byte[] data = new byte[]{0x0, 0x0, 0x0, 0x1};
|
||||||
|
Files.write(data, tmp);
|
||||||
|
final long size = data.length;
|
||||||
|
|
||||||
|
DataSegment segmentToPush = new DataSegment(
|
||||||
|
"foo",
|
||||||
|
new Interval("2015/2016"),
|
||||||
|
"0",
|
||||||
|
Maps.<String, Object>newHashMap(),
|
||||||
|
Lists.<String>newArrayList(),
|
||||||
|
Lists.<String>newArrayList(),
|
||||||
|
new NoneShardSpec(),
|
||||||
|
0,
|
||||||
|
size
|
||||||
|
);
|
||||||
|
|
||||||
|
DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush);
|
||||||
|
|
||||||
|
Assert.assertEquals(segmentToPush.getSize(), segment.getSize());
|
||||||
|
|
||||||
|
EasyMock.verify(api);
|
||||||
|
}
|
||||||
|
}
|
|
@ -26,7 +26,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
public class HdfsDataSegmentPusherConfig
|
public class HdfsDataSegmentPusherConfig
|
||||||
{
|
{
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public String storageDirectory = "";
|
private String storageDirectory = "";
|
||||||
|
|
||||||
|
public void setStorageDirectory(String storageDirectory)
|
||||||
|
{
|
||||||
|
this.storageDirectory = storageDirectory;
|
||||||
|
}
|
||||||
|
|
||||||
public String getStorageDirectory()
|
public String getStorageDirectory()
|
||||||
{
|
{
|
||||||
|
|
|
@ -0,0 +1,78 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.storage.hdfs;
|
||||||
|
|
||||||
|
import com.google.api.client.util.Lists;
|
||||||
|
import com.google.api.client.util.Maps;
|
||||||
|
import com.google.common.io.Files;
|
||||||
|
import io.druid.jackson.DefaultObjectMapper;
|
||||||
|
import io.druid.timeline.DataSegment;
|
||||||
|
import io.druid.timeline.partition.NoneShardSpec;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class HdfsDataSegmentPusherTest
|
||||||
|
{
|
||||||
|
@Rule
|
||||||
|
public final TemporaryFolder tempFolder = new TemporaryFolder();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPush() throws Exception
|
||||||
|
{
|
||||||
|
Configuration conf = new Configuration(true);
|
||||||
|
|
||||||
|
// Create a mock segment on disk
|
||||||
|
File segmentDir = tempFolder.newFolder();
|
||||||
|
File tmp = new File(segmentDir, "version.bin");
|
||||||
|
|
||||||
|
final byte[] data = new byte[]{0x0, 0x0, 0x0, 0x1};
|
||||||
|
Files.write(data, tmp);
|
||||||
|
final long size = data.length;
|
||||||
|
|
||||||
|
HdfsDataSegmentPusherConfig config = new HdfsDataSegmentPusherConfig();
|
||||||
|
|
||||||
|
config.setStorageDirectory(tempFolder.newFolder().getAbsolutePath());
|
||||||
|
HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf, new DefaultObjectMapper());
|
||||||
|
|
||||||
|
DataSegment segmentToPush = new DataSegment(
|
||||||
|
"foo",
|
||||||
|
new Interval("2015/2016"),
|
||||||
|
"0",
|
||||||
|
Maps.<String, Object>newHashMap(),
|
||||||
|
Lists.<String>newArrayList(),
|
||||||
|
Lists.<String>newArrayList(),
|
||||||
|
new NoneShardSpec(),
|
||||||
|
0,
|
||||||
|
size
|
||||||
|
);
|
||||||
|
|
||||||
|
DataSegment segment = pusher.push(segmentDir, segmentToPush);
|
||||||
|
|
||||||
|
Assert.assertEquals(segmentToPush.getSize(), segment.getSize());
|
||||||
|
}
|
||||||
|
}
|
|
@ -24,10 +24,10 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
public class S3DataSegmentArchiverConfig
|
public class S3DataSegmentArchiverConfig
|
||||||
{
|
{
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public String archiveBucket = "";
|
private String archiveBucket = "";
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public String archiveBaseKey = "";
|
private String archiveBaseKey = "";
|
||||||
|
|
||||||
public String getArchiveBucket()
|
public String getArchiveBucket()
|
||||||
{
|
{
|
||||||
|
|
|
@ -26,13 +26,28 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
public class S3DataSegmentPusherConfig
|
public class S3DataSegmentPusherConfig
|
||||||
{
|
{
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public String bucket = "";
|
private String bucket = "";
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public String baseKey = "";
|
private String baseKey = "";
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public boolean disableAcl = false;
|
private boolean disableAcl = false;
|
||||||
|
|
||||||
|
public void setBucket(String bucket)
|
||||||
|
{
|
||||||
|
this.bucket = bucket;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setBaseKey(String baseKey)
|
||||||
|
{
|
||||||
|
this.baseKey = baseKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDisableAcl(boolean disableAcl)
|
||||||
|
{
|
||||||
|
this.disableAcl = disableAcl;
|
||||||
|
}
|
||||||
|
|
||||||
public String getBucket()
|
public String getBucket()
|
||||||
{
|
{
|
||||||
|
|
|
@ -0,0 +1,87 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.storage.s3;
|
||||||
|
|
||||||
|
import com.google.api.client.util.Lists;
|
||||||
|
import com.google.api.client.util.Maps;
|
||||||
|
import com.google.common.io.Files;
|
||||||
|
import io.druid.jackson.DefaultObjectMapper;
|
||||||
|
import io.druid.timeline.DataSegment;
|
||||||
|
import io.druid.timeline.partition.NoneShardSpec;
|
||||||
|
import org.easymock.EasyMock;
|
||||||
|
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
|
||||||
|
import org.jets3t.service.model.S3Object;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class S3DataSegmentPusherTest
|
||||||
|
{
|
||||||
|
@Rule
|
||||||
|
public final TemporaryFolder tempFolder = new TemporaryFolder();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPush() throws Exception
|
||||||
|
{
|
||||||
|
RestS3Service s3Client = EasyMock.createStrictMock(RestS3Service.class);
|
||||||
|
EasyMock.expect(s3Client.putObject(EasyMock.anyString(), EasyMock.<S3Object>anyObject()))
|
||||||
|
.andReturn(null)
|
||||||
|
.atLeastOnce();
|
||||||
|
EasyMock.replay(s3Client);
|
||||||
|
|
||||||
|
|
||||||
|
S3DataSegmentPusherConfig config = new S3DataSegmentPusherConfig();
|
||||||
|
config.setBucket("bucket");
|
||||||
|
config.setBaseKey("key");
|
||||||
|
|
||||||
|
S3DataSegmentPusher pusher = new S3DataSegmentPusher(s3Client, config, new DefaultObjectMapper());
|
||||||
|
|
||||||
|
// Create a mock segment on disk
|
||||||
|
File tmp = tempFolder.newFile("version.bin");
|
||||||
|
|
||||||
|
final byte[] data = new byte[]{0x0, 0x0, 0x0, 0x1};
|
||||||
|
Files.write(data, tmp);
|
||||||
|
final long size = data.length;
|
||||||
|
|
||||||
|
DataSegment segmentToPush = new DataSegment(
|
||||||
|
"foo",
|
||||||
|
new Interval("2015/2016"),
|
||||||
|
"0",
|
||||||
|
Maps.<String, Object>newHashMap(),
|
||||||
|
Lists.<String>newArrayList(),
|
||||||
|
Lists.<String>newArrayList(),
|
||||||
|
new NoneShardSpec(),
|
||||||
|
0,
|
||||||
|
size
|
||||||
|
);
|
||||||
|
|
||||||
|
DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush);
|
||||||
|
|
||||||
|
Assert.assertEquals(segmentToPush.getSize(), segment.getSize());
|
||||||
|
|
||||||
|
EasyMock.verify(s3Client);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue