correct segment path for hadoop indexer

This commit is contained in:
Jan Rudert 2013-07-10 09:21:45 +02:00
parent d5b4417280
commit ad087a7a22
3 changed files with 100 additions and 21 deletions

View File

@ -57,7 +57,9 @@ import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.shard.ShardSpec; import com.metamx.druid.shard.ShardSpec;
import com.metamx.druid.utils.JodaUtils; import com.metamx.druid.utils.JodaUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -656,22 +658,33 @@ public class HadoopDruidIndexerConfig
return new Path(makeDescriptorInfoDir(), String.format("%s.json", segment.getIdentifier().replace(":", ""))); return new Path(makeDescriptorInfoDir(), String.format("%s.json", segment.getIdentifier().replace(":", "")));
} }
public Path makeSegmentOutputPath(Bucket bucket) public Path makeSegmentOutputPath(FileSystem fileSystem, Bucket bucket)
{ {
final Interval bucketInterval = getGranularitySpec().bucketInterval(bucket.time).get(); final Interval bucketInterval = getGranularitySpec().bucketInterval(bucket.time).get();
if (fileSystem instanceof DistributedFileSystem)
return new Path( {
String.format( return new Path(
"%s/%s/%s_%s/%s/%s", String.format(
getSegmentOutputDir(), "%s/%s/%s_%s/%s/%s",
dataSource, getSegmentOutputDir().replace(":", "_"),
bucketInterval.getStart().toString(), dataSource.replace(":", "_"),
bucketInterval.getEnd().toString(), bucketInterval.getStart().toString(ISODateTimeFormat.basicDateTime()),
getVersion(), bucketInterval.getEnd().toString(ISODateTimeFormat.basicDateTime()),
bucket.partitionNum getVersion().replace(":", "_"),
) bucket.partitionNum
); ));
} }
return new Path(
String.format(
"%s/%s/%s_%s/%s/%s",
getSegmentOutputDir(),
dataSource,
bucketInterval.getStart().toString(),
bucketInterval.getEnd().toString(),
getVersion(),
bucket.partitionNum
));
}
public Job addInputPaths(Job job) throws IOException public Job addInputPaths(Job job) throws IOException
{ {

View File

@ -375,12 +375,14 @@ public class IndexGeneratorJob implements Jobby
Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get(); Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get();
int attemptNumber = context.getTaskAttemptID().getId(); int attemptNumber = context.getTaskAttemptID().getId();
Path indexBasePath = config.makeSegmentOutputPath(bucket);
Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber));
final FileSystem infoFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration());
final FileSystem outputFS = indexBasePath.getFileSystem(context.getConfiguration());
outputFS.mkdirs(indexBasePath); FileSystem fileSystem = FileSystem.get(context.getConfiguration());
Path indexBasePath = config.makeSegmentOutputPath(fileSystem, bucket);
Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber));
final FileSystem infoFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration());
final FileSystem outputFS = indexBasePath.getFileSystem(context.getConfiguration());
outputFS.mkdirs(indexBasePath);
Exception caughtException = null; Exception caughtException = null;
ZipOutputStream out = null; ZipOutputStream out = null;

View File

@ -27,7 +27,12 @@ import com.metamx.druid.indexer.partitions.PartitionsSpec;
import com.metamx.druid.indexer.updater.DbUpdaterJobSpec; import com.metamx.druid.indexer.updater.DbUpdaterJobSpec;
import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.jackson.DefaultObjectMapper;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.joda.time.format.ISODateTimeFormat;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -427,6 +432,65 @@ public class HadoopDruidIndexerConfigTest
); );
} }
@Test
public void shouldMakeHDFSCompliantSegmentOutputPath() {
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonReadWriteRead(
"{"
+ "\"dataSource\": \"the:data:source\","
+ " \"granularitySpec\":{"
+ " \"type\":\"uniform\","
+ " \"gran\":\"hour\","
+ " \"intervals\":[\"2012-07-10/P1D\"]"
+ " },"
+ "\"segmentOutputPath\": \"/tmp/dru:id/data:test\""
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
throw Throwables.propagate(e);
}
cfg.setVersion("some:brand:new:version");
Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712);
Path path = cfg.makeSegmentOutputPath(new DistributedFileSystem(), bucket);
Assert.assertEquals("/tmp/dru_id/data_test/the_data_source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version/4712", path.toString());
}
@Test
public void shouldMakeDefaultSegmentOutputPathIfNotHDFS() {
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonReadWriteRead(
"{"
+ "\"dataSource\": \"the:data:source\","
+ " \"granularitySpec\":{"
+ " \"type\":\"uniform\","
+ " \"gran\":\"hour\","
+ " \"intervals\":[\"2012-07-10/P1D\"]"
+ " },"
+ "\"segmentOutputPath\": \"/tmp/dru:id/data:test\""
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
throw Throwables.propagate(e);
}
cfg.setVersion("some:brand:new:version");
Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712);
Path path = cfg.makeSegmentOutputPath(new LocalFileSystem(), bucket);
Assert.assertEquals("/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:version/4712", path.toString());
}
private <T> T jsonReadWriteRead(String s, Class<T> klass) private <T> T jsonReadWriteRead(String s, Class<T> klass)
{ {
try { try {