mirror of https://github.com/apache/druid.git
Merge pull request #77 from metacret/master
Adding LinearShardSpec and ACL on S3 object
This commit is contained in:
commit
05168808c2
|
@ -0,0 +1,49 @@
|
||||||
|
package com.metamx.druid.shard;
|
||||||
|
|
||||||
|
import com.metamx.druid.input.InputRow;
|
||||||
|
import com.metamx.druid.partition.LinearPartitionChunk;
|
||||||
|
import com.metamx.druid.partition.PartitionChunk;
|
||||||
|
import org.codehaus.jackson.annotate.JsonProperty;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created with IntelliJ IDEA.
|
||||||
|
* User: jbae
|
||||||
|
* Date: 2/4/13
|
||||||
|
* Time: 10:22 AM
|
||||||
|
* To change this template use File | Settings | File Templates.
|
||||||
|
*/
|
||||||
|
public class LinearShardSpec implements ShardSpec {
|
||||||
|
private int partitionNum;
|
||||||
|
|
||||||
|
public LinearShardSpec() {
|
||||||
|
this(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
public LinearShardSpec(int partitionNum) {
|
||||||
|
this.partitionNum = partitionNum;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty("partitionNum")
|
||||||
|
@Override
|
||||||
|
public int getPartitionNum() {
|
||||||
|
return partitionNum;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> PartitionChunk<T> createChunk(T obj) {
|
||||||
|
return new LinearPartitionChunk<T>(partitionNum, obj);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isInChunk(Map<String, String> dimensions) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isInChunk(InputRow inputRow) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
|
@ -32,7 +32,8 @@ import java.util.Map;
|
||||||
@JsonTypeInfo(use=JsonTypeInfo.Id.NAME, property="type", include=JsonTypeInfo.As.PROPERTY)
|
@JsonTypeInfo(use=JsonTypeInfo.Id.NAME, property="type", include=JsonTypeInfo.As.PROPERTY)
|
||||||
@JsonSubTypes({
|
@JsonSubTypes({
|
||||||
@JsonSubTypes.Type(name="single", value=SingleDimensionShardSpec.class),
|
@JsonSubTypes.Type(name="single", value=SingleDimensionShardSpec.class),
|
||||||
@JsonSubTypes.Type(name="none", value=NoneShardSpec.class)
|
@JsonSubTypes.Type(name="none", value=NoneShardSpec.class),
|
||||||
|
@JsonSubTypes.Type(name="linear", value=LinearShardSpec.class)
|
||||||
})
|
})
|
||||||
public interface ShardSpec
|
public interface ShardSpec
|
||||||
{
|
{
|
||||||
|
|
|
@ -0,0 +1,91 @@
|
||||||
|
package com.metamx.druid.partition;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created with IntelliJ IDEA.
|
||||||
|
* User: jbae
|
||||||
|
* Date: 2/4/13
|
||||||
|
* Time: 10:24 AM
|
||||||
|
* To change this template use File | Settings | File Templates.
|
||||||
|
*/
|
||||||
|
public class LinearPartitionChunk <T> implements PartitionChunk<T>
|
||||||
|
{
|
||||||
|
private final int chunkNumber;
|
||||||
|
private final T object;
|
||||||
|
|
||||||
|
public static <T> LinearPartitionChunk<T> make(int chunkNumber, T obj)
|
||||||
|
{
|
||||||
|
return new LinearPartitionChunk<T>(chunkNumber, obj);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public LinearPartitionChunk(
|
||||||
|
int chunkNumber,
|
||||||
|
T object
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.chunkNumber = chunkNumber;
|
||||||
|
this.object = object;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public T getObject()
|
||||||
|
{
|
||||||
|
return object;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean abuts(PartitionChunk<T> chunk)
|
||||||
|
{
|
||||||
|
return true; // always complete
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isStart()
|
||||||
|
{
|
||||||
|
return true; // always complete
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
|
||||||
|
public boolean isEnd()
|
||||||
|
{
|
||||||
|
return true; // always complete
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getChunkNumber()
|
||||||
|
{
|
||||||
|
return chunkNumber;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compareTo(PartitionChunk<T> chunk)
|
||||||
|
{
|
||||||
|
if (chunk instanceof LinearPartitionChunk) {
|
||||||
|
LinearPartitionChunk<T> linearChunk = (LinearPartitionChunk<T>) chunk;
|
||||||
|
|
||||||
|
return chunkNumber - chunk.getChunkNumber();
|
||||||
|
}
|
||||||
|
throw new IllegalArgumentException("Cannot compare against something that is not a LinearPartitionChunk.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public boolean equals(Object o)
|
||||||
|
{
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return compareTo((LinearPartitionChunk<T>) o) == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode()
|
||||||
|
{
|
||||||
|
return chunkNumber;
|
||||||
|
}
|
||||||
|
}
|
|
@ -31,14 +31,11 @@ import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.codehaus.jackson.map.ObjectMapper;
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
import org.jets3t.service.S3ServiceException;
|
import org.jets3t.service.S3ServiceException;
|
||||||
|
import org.jets3t.service.acl.gs.GSAccessControlList;
|
||||||
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
|
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
|
||||||
import org.jets3t.service.model.S3Object;
|
import org.jets3t.service.model.S3Object;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.*;
|
||||||
import java.io.File;
|
|
||||||
import java.io.FileInputStream;
|
|
||||||
import java.io.FileOutputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.security.NoSuchAlgorithmException;
|
import java.security.NoSuchAlgorithmException;
|
||||||
import java.util.zip.ZipEntry;
|
import java.util.zip.ZipEntry;
|
||||||
import java.util.zip.ZipOutputStream;
|
import java.util.zip.ZipOutputStream;
|
||||||
|
@ -105,6 +102,7 @@ public class S3SegmentPusher implements SegmentPusher
|
||||||
final String outputBucket = config.getBucket();
|
final String outputBucket = config.getBucket();
|
||||||
toPush.setBucketName(outputBucket);
|
toPush.setBucketName(outputBucket);
|
||||||
toPush.setKey(outputKey + "/index.zip");
|
toPush.setKey(outputKey + "/index.zip");
|
||||||
|
toPush.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
|
||||||
|
|
||||||
log.info("Pushing %s.", toPush);
|
log.info("Pushing %s.", toPush);
|
||||||
s3Client.putObject(outputBucket, toPush);
|
s3Client.putObject(outputBucket, toPush);
|
||||||
|
@ -124,6 +122,7 @@ public class S3SegmentPusher implements SegmentPusher
|
||||||
S3Object descriptorObject = new S3Object(descriptorFile);
|
S3Object descriptorObject = new S3Object(descriptorFile);
|
||||||
descriptorObject.setBucketName(outputBucket);
|
descriptorObject.setBucketName(outputBucket);
|
||||||
descriptorObject.setKey(outputKey + "/descriptor.json");
|
descriptorObject.setKey(outputKey + "/descriptor.json");
|
||||||
|
descriptorObject.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
|
||||||
|
|
||||||
log.info("Pushing %s", descriptorObject);
|
log.info("Pushing %s", descriptorObject);
|
||||||
s3Client.putObject(outputBucket, descriptorObject);
|
s3Client.putObject(outputBucket, descriptorObject);
|
||||||
|
|
Loading…
Reference in New Issue