add pusher tests for all deep storages

This commit is contained in:
fjy 2016-01-05 16:42:05 -08:00
parent 6d886da7d9
commit 2103906a48
13 changed files with 511 additions and 122 deletions

View File

@ -16,7 +16,8 @@
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-azure-extensions</artifactId>
@ -48,6 +49,12 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>

View File

@ -45,6 +45,31 @@ public class AzureAccountConfig
@NotNull
private String container;
public void setProtocol(String protocol)
{
this.protocol = protocol;
}
public void setMaxTries(int maxTries)
{
this.maxTries = maxTries;
}
public void setAccount(String account)
{
this.account = account;
}
public void setKey(String key)
{
this.key = key;
}
public void setContainer(String container)
{
this.container = container;
}
public String getProtocol() { return protocol; }
public int getMaxTries() { return maxTries; }

View File

@ -64,15 +64,6 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
return null;
}
public File createCompressedSegmentDataFile(final File indexFilesDir) throws IOException
{
final File zipOutFile = File.createTempFile("index", ".zip");
CompressionUtils.zip(indexFilesDir, zipOutFile);
return zipOutFile;
}
public File createSegmentDescriptorFile(final ObjectMapper jsonMapper, final DataSegment segment) throws
IOException
{
@ -98,6 +89,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
public DataSegment uploadDataSegment(
DataSegment segment,
final int version,
final long size,
final File compressedSegmentData,
final File descriptorFile,
final Map<String, String> azurePaths
@ -108,7 +100,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
azureStorage.uploadBlob(descriptorFile, config.getContainer(), azurePaths.get("descriptor"));
final DataSegment outSegment = segment
.withSize(compressedSegmentData.length())
.withSize(size)
.withLoadSpec(
ImmutableMap.<String, Object>of(
"type",
@ -137,18 +129,23 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
log.info("Uploading [%s] to Azure.", indexFilesDir);
final int version = SegmentUtils.getVersionFromDir(indexFilesDir);
final File compressedSegmentData = createCompressedSegmentDataFile(indexFilesDir);
final File descriptorFile = createSegmentDescriptorFile(jsonMapper, segment);
final Map<String, String> azurePaths = getAzurePaths(segment);
File zipOutFile = null;
File descriptorFile = null;
try {
final File outFile = zipOutFile = File.createTempFile("index", ".zip");
final long size = CompressionUtils.zip(indexFilesDir, zipOutFile);
final File descFile = descriptorFile = createSegmentDescriptorFile(jsonMapper, segment);
final Map<String, String> azurePaths = getAzurePaths(segment);
return AzureUtils.retryAzureOperation(
new Callable<DataSegment>()
{
@Override
public DataSegment call() throws Exception
{
return uploadDataSegment(segment, version, compressedSegmentData, descriptorFile, azurePaths);
return uploadDataSegment(segment, version, size, outFile, descFile, azurePaths);
}
},
config.getMaxTries()
@ -157,5 +154,16 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
if (zipOutFile != null) {
log.info("Deleting zipped index File[%s]", zipOutFile);
zipOutFile.delete();
}
if (descriptorFile != null) {
log.info("Deleting descriptor file[%s]", descriptorFile);
descriptorFile.delete();
}
}
}
}

View File

@ -21,27 +21,36 @@ package io.druid.storage.azure;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import com.metamx.common.MapUtils;
import com.microsoft.azure.storage.StorageException;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMockSupport;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Map;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.junit.Assert.assertEquals;
public class AzureDataSegmentPusherTest extends EasyMockSupport
{
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
private static final String containerName = "container";
private static final String blobPath = "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip";
private static final DataSegment dataSegment = new DataSegment(
@ -64,9 +73,40 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
public void before()
{
azureStorage = createMock(AzureStorage.class);
azureAccountConfig = createMock(AzureAccountConfig.class);
jsonMapper = createMock(ObjectMapper.class);
azureAccountConfig = new AzureAccountConfig();
azureAccountConfig.setAccount("account");
azureAccountConfig.setContainer("container");
jsonMapper = new DefaultObjectMapper();
}
@Test
public void testPush() throws Exception
{
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, jsonMapper);
// Create a mock segment on disk
File tmp = tempFolder.newFile("version.bin");
final byte[] data = new byte[]{0x0, 0x0, 0x0, 0x1};
Files.write(data, tmp);
final long size = data.length;
DataSegment segmentToPush = new DataSegment(
"foo",
new Interval("2015/2016"),
"0",
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
0,
size
);
DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush);
Assert.assertEquals(segmentToPush.getSize(), segment.getSize());
}
@Test
@ -93,7 +133,6 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
final File descriptorFile = new File("descriptor.json");
final Map<String, String> azurePaths = pusher.getAzurePaths(dataSegment);
expect(azureAccountConfig.getContainer()).andReturn(containerName).times(3);
azureStorage.uploadBlob(compressedSegmentData, containerName, azurePaths.get("index"));
expectLastCall();
azureStorage.uploadBlob(descriptorFile, containerName, azurePaths.get("descriptor"));
@ -104,6 +143,7 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
DataSegment pushedDataSegment = pusher.uploadDataSegment(
dataSegment,
version,
0, // empty file
compressedSegmentData,
descriptorFile,
azurePaths
@ -116,7 +156,5 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
assertEquals(azurePaths.get("index"), MapUtils.getString(loadSpec, "blobPath"));
verifyAll();
}
}

View File

@ -101,6 +101,12 @@
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -68,13 +68,17 @@ public class CloudFilesDataSegmentPusher implements DataSegmentPusher
public DataSegment push(final File indexFilesDir, final DataSegment inSegment) throws IOException
{
final String segmentPath = CloudFilesUtils.buildCloudFilesPath(this.config.getBasePath(), inSegment);
final File descriptorFile = File.createTempFile("descriptor", ".json");
final File zipOutFile = File.createTempFile("druid", "index.zip");
File descriptorFile = null;
File zipOutFile = null;
try {
final File descFile = descriptorFile = File.createTempFile("descriptor", ".json");
final File outFile = zipOutFile = File.createTempFile("druid", "index.zip");
final long indexSize = CompressionUtils.zip(indexFilesDir, zipOutFile);
log.info("Copying segment[%s] to CloudFiles at location[%s]", inSegment.getIdentifier(), segmentPath);
try {
return CloudFilesUtils.retryCloudFilesOperation(
new Callable<DataSegment>()
{
@ -82,17 +86,17 @@ public class CloudFilesDataSegmentPusher implements DataSegmentPusher
public DataSegment call() throws Exception
{
CloudFilesObject segmentData = new CloudFilesObject(
segmentPath, zipOutFile, objectApi.getRegion(),
segmentPath, outFile, objectApi.getRegion(),
objectApi.getContainer()
);
log.info("Pushing %s.", segmentData.getPath());
objectApi.put(segmentData);
try (FileOutputStream stream = new FileOutputStream(descriptorFile)) {
try (FileOutputStream stream = new FileOutputStream(descFile)) {
stream.write(jsonMapper.writeValueAsBytes(inSegment));
}
CloudFilesObject descriptorData = new CloudFilesObject(
segmentPath, descriptorFile,
segmentPath, descFile,
objectApi.getRegion(), objectApi.getContainer()
);
log.info("Pushing %s.", descriptorData.getPath());
@ -123,11 +127,15 @@ public class CloudFilesDataSegmentPusher implements DataSegmentPusher
throw Throwables.propagate(e);
}
finally {
if (zipOutFile != null) {
log.info("Deleting zipped index File[%s]", zipOutFile);
zipOutFile.delete();
}
if (descriptorFile != null) {
log.info("Deleting descriptor file[%s]", descriptorFile);
descriptorFile.delete();
}
}
}
}

View File

@ -28,7 +28,6 @@ import javax.validation.constraints.NotNull;
*/
public class CloudFilesDataSegmentPusherConfig
{
@JsonProperty
@NotNull
private String region;
@ -44,6 +43,26 @@ public class CloudFilesDataSegmentPusherConfig
@JsonProperty
private int operationMaxRetries = 10;
public void setRegion(String region)
{
this.region = region;
}
public void setContainer(String container)
{
this.container = container;
}
public void setBasePath(String basePath)
{
this.basePath = basePath;
}
public void setOperationMaxRetries(int operationMaxRetries)
{
this.operationMaxRetries = operationMaxRetries;
}
public String getRegion()
{
Preconditions.checkNotNull(region);

View File

@ -0,0 +1,93 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.storage.cloudfiles;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
import org.jclouds.io.Payload;
import org.jclouds.openstack.swift.v1.features.ObjectApi;
import org.jclouds.rackspace.cloudfiles.v1.CloudFilesApi;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
/**
*/
public class CloudFilesDataSegmentPusherTest
{
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
@Test
public void testPush() throws Exception
{
ObjectApi objectApi = EasyMock.createStrictMock(ObjectApi.class);
EasyMock.expect(objectApi.put(EasyMock.anyString(), EasyMock.<Payload>anyObject())).andReturn(null).atLeastOnce();
EasyMock.replay(objectApi);
CloudFilesApi api = EasyMock.createStrictMock(CloudFilesApi.class);
EasyMock.expect(api.getObjectApi(EasyMock.anyString(), EasyMock.anyString()))
.andReturn(objectApi)
.atLeastOnce();
EasyMock.replay(api);
CloudFilesDataSegmentPusherConfig config = new CloudFilesDataSegmentPusherConfig();
config.setRegion("region");
config.setContainer("container");
config.setBasePath("basePath");
CloudFilesDataSegmentPusher pusher = new CloudFilesDataSegmentPusher(api, config, new DefaultObjectMapper());
// Create a mock segment on disk
File tmp = tempFolder.newFile("version.bin");
final byte[] data = new byte[]{0x0, 0x0, 0x0, 0x1};
Files.write(data, tmp);
final long size = data.length;
DataSegment segmentToPush = new DataSegment(
"foo",
new Interval("2015/2016"),
"0",
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
0,
size
);
DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush);
Assert.assertEquals(segmentToPush.getSize(), segment.getSize());
EasyMock.verify(api);
}
}

View File

@ -26,7 +26,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class HdfsDataSegmentPusherConfig
{
@JsonProperty
public String storageDirectory = "";
private String storageDirectory = "";
public void setStorageDirectory(String storageDirectory)
{
this.storageDirectory = storageDirectory;
}
public String getStorageDirectory()
{

View File

@ -0,0 +1,78 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.storage.hdfs;
import com.google.api.client.util.Lists;
import com.google.api.client.util.Maps;
import com.google.common.io.Files;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.apache.hadoop.conf.Configuration;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
/**
*/
public class HdfsDataSegmentPusherTest
{
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
@Test
public void testPush() throws Exception
{
Configuration conf = new Configuration(true);
// Create a mock segment on disk
File segmentDir = tempFolder.newFolder();
File tmp = new File(segmentDir, "version.bin");
final byte[] data = new byte[]{0x0, 0x0, 0x0, 0x1};
Files.write(data, tmp);
final long size = data.length;
HdfsDataSegmentPusherConfig config = new HdfsDataSegmentPusherConfig();
config.setStorageDirectory(tempFolder.newFolder().getAbsolutePath());
HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf, new DefaultObjectMapper());
DataSegment segmentToPush = new DataSegment(
"foo",
new Interval("2015/2016"),
"0",
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
0,
size
);
DataSegment segment = pusher.push(segmentDir, segmentToPush);
Assert.assertEquals(segmentToPush.getSize(), segment.getSize());
}
}

View File

@ -24,10 +24,10 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class S3DataSegmentArchiverConfig
{
@JsonProperty
public String archiveBucket = "";
private String archiveBucket = "";
@JsonProperty
public String archiveBaseKey = "";
private String archiveBaseKey = "";
public String getArchiveBucket()
{

View File

@ -26,13 +26,28 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class S3DataSegmentPusherConfig
{
@JsonProperty
public String bucket = "";
private String bucket = "";
@JsonProperty
public String baseKey = "";
private String baseKey = "";
@JsonProperty
public boolean disableAcl = false;
private boolean disableAcl = false;
public void setBucket(String bucket)
{
this.bucket = bucket;
}
public void setBaseKey(String baseKey)
{
this.baseKey = baseKey;
}
public void setDisableAcl(boolean disableAcl)
{
this.disableAcl = disableAcl;
}
public String getBucket()
{

View File

@ -0,0 +1,87 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.storage.s3;
import com.google.api.client.util.Lists;
import com.google.api.client.util.Maps;
import com.google.common.io.Files;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
/**
*/
public class S3DataSegmentPusherTest
{
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
@Test
public void testPush() throws Exception
{
RestS3Service s3Client = EasyMock.createStrictMock(RestS3Service.class);
EasyMock.expect(s3Client.putObject(EasyMock.anyString(), EasyMock.<S3Object>anyObject()))
.andReturn(null)
.atLeastOnce();
EasyMock.replay(s3Client);
S3DataSegmentPusherConfig config = new S3DataSegmentPusherConfig();
config.setBucket("bucket");
config.setBaseKey("key");
S3DataSegmentPusher pusher = new S3DataSegmentPusher(s3Client, config, new DefaultObjectMapper());
// Create a mock segment on disk
File tmp = tempFolder.newFile("version.bin");
final byte[] data = new byte[]{0x0, 0x0, 0x0, 0x1};
Files.write(data, tmp);
final long size = data.length;
DataSegment segmentToPush = new DataSegment(
"foo",
new Interval("2015/2016"),
"0",
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
0,
size
);
DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush);
Assert.assertEquals(segmentToPush.getSize(), segment.getSize());
EasyMock.verify(s3Client);
}
}