mirror of
https://github.com/apache/druid.git
synced 2025-02-17 07:25:02 +00:00
Merge pull request #179 from zanox/master
Fix path used when storing segments to HDFS
This commit is contained in:
commit
66adef5d64
@ -57,7 +57,9 @@ import com.metamx.druid.jackson.DefaultObjectMapper;
|
|||||||
import com.metamx.druid.shard.ShardSpec;
|
import com.metamx.druid.shard.ShardSpec;
|
||||||
import com.metamx.druid.utils.JodaUtils;
|
import com.metamx.druid.utils.JodaUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
@ -656,10 +658,22 @@ public class HadoopDruidIndexerConfig
|
|||||||
return new Path(makeDescriptorInfoDir(), String.format("%s.json", segment.getIdentifier().replace(":", "")));
|
return new Path(makeDescriptorInfoDir(), String.format("%s.json", segment.getIdentifier().replace(":", "")));
|
||||||
}
|
}
|
||||||
|
|
||||||
public Path makeSegmentOutputPath(Bucket bucket)
|
public Path makeSegmentOutputPath(FileSystem fileSystem, Bucket bucket)
|
||||||
{
|
{
|
||||||
final Interval bucketInterval = getGranularitySpec().bucketInterval(bucket.time).get();
|
final Interval bucketInterval = getGranularitySpec().bucketInterval(bucket.time).get();
|
||||||
|
if (fileSystem instanceof DistributedFileSystem)
|
||||||
|
{
|
||||||
|
return new Path(
|
||||||
|
String.format(
|
||||||
|
"%s/%s/%s_%s/%s/%s",
|
||||||
|
getSegmentOutputDir().replace(":", "_"),
|
||||||
|
dataSource.replace(":", "_"),
|
||||||
|
bucketInterval.getStart().toString(ISODateTimeFormat.basicDateTime()),
|
||||||
|
bucketInterval.getEnd().toString(ISODateTimeFormat.basicDateTime()),
|
||||||
|
getVersion().replace(":", "_"),
|
||||||
|
bucket.partitionNum
|
||||||
|
));
|
||||||
|
}
|
||||||
return new Path(
|
return new Path(
|
||||||
String.format(
|
String.format(
|
||||||
"%s/%s/%s_%s/%s/%s",
|
"%s/%s/%s_%s/%s/%s",
|
||||||
@ -669,8 +683,7 @@ public class HadoopDruidIndexerConfig
|
|||||||
bucketInterval.getEnd().toString(),
|
bucketInterval.getEnd().toString(),
|
||||||
getVersion(),
|
getVersion(),
|
||||||
bucket.partitionNum
|
bucket.partitionNum
|
||||||
)
|
));
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Job addInputPaths(Job job) throws IOException
|
public Job addInputPaths(Job job) throws IOException
|
||||||
|
@ -375,7 +375,9 @@ public class IndexGeneratorJob implements Jobby
|
|||||||
Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get();
|
Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get();
|
||||||
|
|
||||||
int attemptNumber = context.getTaskAttemptID().getId();
|
int attemptNumber = context.getTaskAttemptID().getId();
|
||||||
Path indexBasePath = config.makeSegmentOutputPath(bucket);
|
|
||||||
|
FileSystem fileSystem = FileSystem.get(context.getConfiguration());
|
||||||
|
Path indexBasePath = config.makeSegmentOutputPath(fileSystem, bucket);
|
||||||
Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber));
|
Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber));
|
||||||
final FileSystem infoFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration());
|
final FileSystem infoFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration());
|
||||||
final FileSystem outputFS = indexBasePath.getFileSystem(context.getConfiguration());
|
final FileSystem outputFS = indexBasePath.getFileSystem(context.getConfiguration());
|
||||||
|
@ -27,7 +27,12 @@ import com.metamx.druid.indexer.partitions.PartitionsSpec;
|
|||||||
import com.metamx.druid.indexer.updater.DbUpdaterJobSpec;
|
import com.metamx.druid.indexer.updater.DbUpdaterJobSpec;
|
||||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.LocalFileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
|
import org.joda.time.format.ISODateTimeFormat;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
@ -427,6 +432,65 @@ public class HadoopDruidIndexerConfigTest
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldMakeHDFSCompliantSegmentOutputPath() {
|
||||||
|
final HadoopDruidIndexerConfig cfg;
|
||||||
|
|
||||||
|
try {
|
||||||
|
cfg = jsonReadWriteRead(
|
||||||
|
"{"
|
||||||
|
+ "\"dataSource\": \"the:data:source\","
|
||||||
|
+ " \"granularitySpec\":{"
|
||||||
|
+ " \"type\":\"uniform\","
|
||||||
|
+ " \"gran\":\"hour\","
|
||||||
|
+ " \"intervals\":[\"2012-07-10/P1D\"]"
|
||||||
|
+ " },"
|
||||||
|
+ "\"segmentOutputPath\": \"/tmp/dru:id/data:test\""
|
||||||
|
+ "}",
|
||||||
|
HadoopDruidIndexerConfig.class
|
||||||
|
);
|
||||||
|
} catch(Exception e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg.setVersion("some:brand:new:version");
|
||||||
|
|
||||||
|
Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712);
|
||||||
|
Path path = cfg.makeSegmentOutputPath(new DistributedFileSystem(), bucket);
|
||||||
|
Assert.assertEquals("/tmp/dru_id/data_test/the_data_source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version/4712", path.toString());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldMakeDefaultSegmentOutputPathIfNotHDFS() {
|
||||||
|
final HadoopDruidIndexerConfig cfg;
|
||||||
|
|
||||||
|
try {
|
||||||
|
cfg = jsonReadWriteRead(
|
||||||
|
"{"
|
||||||
|
+ "\"dataSource\": \"the:data:source\","
|
||||||
|
+ " \"granularitySpec\":{"
|
||||||
|
+ " \"type\":\"uniform\","
|
||||||
|
+ " \"gran\":\"hour\","
|
||||||
|
+ " \"intervals\":[\"2012-07-10/P1D\"]"
|
||||||
|
+ " },"
|
||||||
|
+ "\"segmentOutputPath\": \"/tmp/dru:id/data:test\""
|
||||||
|
+ "}",
|
||||||
|
HadoopDruidIndexerConfig.class
|
||||||
|
);
|
||||||
|
} catch(Exception e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg.setVersion("some:brand:new:version");
|
||||||
|
|
||||||
|
Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712);
|
||||||
|
Path path = cfg.makeSegmentOutputPath(new LocalFileSystem(), bucket);
|
||||||
|
Assert.assertEquals("/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:version/4712", path.toString());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
private <T> T jsonReadWriteRead(String s, Class<T> klass)
|
private <T> T jsonReadWriteRead(String s, Class<T> klass)
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
|
@ -19,6 +19,8 @@
|
|||||||
|
|
||||||
package com.metamx.druid.loading;
|
package com.metamx.druid.loading;
|
||||||
|
|
||||||
|
import org.joda.time.format.ISODateTimeFormat;
|
||||||
|
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
import com.metamx.druid.client.DataSegment;
|
import com.metamx.druid.client.DataSegment;
|
||||||
|
|
||||||
@ -41,4 +43,22 @@ public class DataSegmentPusherUtil
|
|||||||
segment.getShardSpec().getPartitionNum()
|
segment.getShardSpec().getPartitionNum()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Due to https://issues.apache.org/jira/browse/HDFS-13 ":" are not allowed in
|
||||||
|
* path names. So we format paths differently for HDFS.
|
||||||
|
*/
|
||||||
|
public static String getHdfsStorageDir(DataSegment segment)
|
||||||
|
{
|
||||||
|
return JOINER.join(
|
||||||
|
segment.getDataSource(),
|
||||||
|
String.format(
|
||||||
|
"%s_%s",
|
||||||
|
segment.getInterval().getStart().toString(ISODateTimeFormat.basicDateTime()),
|
||||||
|
segment.getInterval().getEnd().toString(ISODateTimeFormat.basicDateTime())
|
||||||
|
),
|
||||||
|
segment.getVersion().replaceAll(":", "_"),
|
||||||
|
segment.getShardSpec().getPartitionNum()
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -42,7 +42,7 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
|
|||||||
@Override
|
@Override
|
||||||
public DataSegment push(File inDir, DataSegment segment) throws IOException
|
public DataSegment push(File inDir, DataSegment segment) throws IOException
|
||||||
{
|
{
|
||||||
final String storageDir = DataSegmentPusherUtil.getStorageDir(segment);
|
final String storageDir = DataSegmentPusherUtil.getHdfsStorageDir(segment);
|
||||||
Path outFile = new Path(String.format("%s/%s/index.zip", config.getStorageDirectory(), storageDir));
|
Path outFile = new Path(String.format("%s/%s/index.zip", config.getStorageDirectory(), storageDir));
|
||||||
FileSystem fs = outFile.getFileSystem(hadoopConfig);
|
FileSystem fs = outFile.getFileSystem(hadoopConfig);
|
||||||
|
|
||||||
|
@ -0,0 +1,39 @@
|
|||||||
|
package com.metamx.druid.loading;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import com.metamx.druid.client.DataSegment;
|
||||||
|
import com.metamx.druid.index.v1.IndexIO;
|
||||||
|
import com.metamx.druid.shard.NoneShardSpec;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author jan.rudert
|
||||||
|
*/
|
||||||
|
public class DataSegmentPusherUtilTest {
|
||||||
|
@Test
|
||||||
|
public void shouldNotHaveColonsInHdfsStorageDir() throws Exception {
|
||||||
|
|
||||||
|
Interval interval = new Interval("2011-10-01/2011-10-02");
|
||||||
|
ImmutableMap<String, Object> loadSpec = ImmutableMap.<String, Object>of("something", "or_other");
|
||||||
|
|
||||||
|
DataSegment segment = new DataSegment(
|
||||||
|
"something",
|
||||||
|
interval,
|
||||||
|
"brand:new:version",
|
||||||
|
loadSpec,
|
||||||
|
Arrays.asList("dim1", "dim2"),
|
||||||
|
Arrays.asList("met1", "met2"),
|
||||||
|
new NoneShardSpec(),
|
||||||
|
IndexIO.CURRENT_VERSION_ID,
|
||||||
|
1
|
||||||
|
);
|
||||||
|
|
||||||
|
String storageDir = DataSegmentPusherUtil.getHdfsStorageDir(segment);
|
||||||
|
Assert.assertEquals("something/20111001T000000.000Z_20111002T000000.000Z/brand_new_version/0", storageDir);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user