t Merge branch 'master' into indexing_refactor

This commit is contained in:
Fangjin Yang 2013-02-11 12:37:20 -08:00
commit 69d0d98df6
4 changed files with 132 additions and 6 deletions

View File

@ -0,0 +1,42 @@
package com.metamx.druid.shard;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.partition.LinearPartitionChunk;
import com.metamx.druid.partition.PartitionChunk;
import org.codehaus.jackson.annotate.JsonProperty;
import java.util.Map;
public class LinearShardSpec implements ShardSpec {
private int partitionNum;
public LinearShardSpec() {
this(-1);
}
public LinearShardSpec(int partitionNum) {
this.partitionNum = partitionNum;
}
@JsonProperty("partitionNum")
@Override
public int getPartitionNum() {
return partitionNum;
}
@Override
public <T> PartitionChunk<T> createChunk(T obj) {
return new LinearPartitionChunk<T>(partitionNum, obj);
}
@Override
public boolean isInChunk(Map<String, String> dimensions) {
return true;
}
@Override
public boolean isInChunk(InputRow inputRow) {
return true;
}
}

View File

@ -32,7 +32,8 @@ import java.util.Map;
@JsonTypeInfo(use=JsonTypeInfo.Id.NAME, property="type", include=JsonTypeInfo.As.PROPERTY)
@JsonSubTypes({
@JsonSubTypes.Type(name="single", value=SingleDimensionShardSpec.class),
@JsonSubTypes.Type(name="none", value=NoneShardSpec.class)
@JsonSubTypes.Type(name="none", value=NoneShardSpec.class),
@JsonSubTypes.Type(name="linear", value=LinearShardSpec.class)
})
public interface ShardSpec
{

View File

@ -0,0 +1,84 @@
package com.metamx.druid.partition;
public class LinearPartitionChunk <T> implements PartitionChunk<T>
{
private final int chunkNumber;
private final T object;
public static <T> LinearPartitionChunk<T> make(int chunkNumber, T obj)
{
return new LinearPartitionChunk<T>(chunkNumber, obj);
}
public LinearPartitionChunk(
int chunkNumber,
T object
)
{
this.chunkNumber = chunkNumber;
this.object = object;
}
@Override
public T getObject()
{
return object;
}
@Override
public boolean abuts(PartitionChunk<T> chunk)
{
return true; // always complete
}
@Override
public boolean isStart()
{
return true; // always complete
}
@Override
public boolean isEnd()
{
return true; // always complete
}
@Override
public int getChunkNumber()
{
return chunkNumber;
}
@Override
public int compareTo(PartitionChunk<T> chunk)
{
if (chunk instanceof LinearPartitionChunk) {
LinearPartitionChunk<T> linearChunk = (LinearPartitionChunk<T>) chunk;
return chunkNumber - chunk.getChunkNumber();
}
throw new IllegalArgumentException("Cannot compare against something that is not a LinearPartitionChunk.");
}
@Override
@SuppressWarnings("unchecked")
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
return compareTo((LinearPartitionChunk<T>) o) == 0;
}
@Override
public int hashCode()
{
return chunkNumber;
}
}

View File

@ -31,14 +31,11 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.codehaus.jackson.map.ObjectMapper;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.acl.gs.GSAccessControlList;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.*;
import java.security.NoSuchAlgorithmException;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
@ -105,6 +102,7 @@ public class S3SegmentPusher implements SegmentPusher
final String outputBucket = config.getBucket();
toPush.setBucketName(outputBucket);
toPush.setKey(outputKey + "/index.zip");
toPush.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
log.info("Pushing %s.", toPush);
s3Client.putObject(outputBucket, toPush);
@ -124,6 +122,7 @@ public class S3SegmentPusher implements SegmentPusher
S3Object descriptorObject = new S3Object(descriptorFile);
descriptorObject.setBucketName(outputBucket);
descriptorObject.setKey(outputKey + "/descriptor.json");
descriptorObject.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
log.info("Pushing %s", descriptorObject);
s3Client.putObject(outputBucket, descriptorObject);