Merge pull request #527 from metamx/fix-empty-intervals

add filesystem checks again
This commit is contained in:
fjy 2014-05-01 16:02:31 -06:00
commit 044c9f631c
3 changed files with 54 additions and 22 deletions

View File

@ -128,6 +128,9 @@ public class DetermineHashedPartitionsJob implements Jobby
if (!config.getSegmentGranularIntervals().isPresent()) { if (!config.getSegmentGranularIntervals().isPresent()) {
final Path intervalInfoPath = config.makeIntervalInfoPath(); final Path intervalInfoPath = config.makeIntervalInfoPath();
fileSystem = intervalInfoPath.getFileSystem(groupByJob.getConfiguration()); fileSystem = intervalInfoPath.getFileSystem(groupByJob.getConfiguration());
if (!Utils.exists(groupByJob, fileSystem, intervalInfoPath)) {
throw new ISE("Path[%s] didn't exist!?", intervalInfoPath);
}
List<Interval> intervals = config.jsonMapper.readValue( List<Interval> intervals = config.jsonMapper.readValue(
Utils.openInputStream(groupByJob, intervalInfoPath), new TypeReference<List<Interval>>() Utils.openInputStream(groupByJob, intervalInfoPath), new TypeReference<List<Interval>>()
{ {
@ -145,7 +148,8 @@ public class DetermineHashedPartitionsJob implements Jobby
if (fileSystem == null) { if (fileSystem == null) {
fileSystem = partitionInfoPath.getFileSystem(groupByJob.getConfiguration()); fileSystem = partitionInfoPath.getFileSystem(groupByJob.getConfiguration());
} }
final Long cardinality = config.jsonMapper.readValue( if (Utils.exists(groupByJob, fileSystem, partitionInfoPath)) {
Long cardinality = config.jsonMapper.readValue(
Utils.openInputStream(groupByJob, partitionInfoPath), new TypeReference<Long>() Utils.openInputStream(groupByJob, partitionInfoPath), new TypeReference<Long>()
{ {
} }
@ -164,6 +168,9 @@ public class DetermineHashedPartitionsJob implements Jobby
shardSpecs.put(bucket, actualSpecs); shardSpecs.put(bucket, actualSpecs);
} else {
log.info("Path[%s] didn't exist!?", partitionInfoPath);
}
} }
config.setShardSpecs(shardSpecs); config.setShardSpecs(shardSpecs);
log.info( log.info(

View File

@ -215,6 +215,7 @@ public class DeterminePartitionsJob implements Jobby
if (fileSystem == null) { if (fileSystem == null) {
fileSystem = partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration()); fileSystem = partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration());
} }
if (Utils.exists(dimSelectionJob, fileSystem, partitionInfoPath)) {
List<ShardSpec> specs = config.jsonMapper.readValue( List<ShardSpec> specs = config.jsonMapper.readValue(
Utils.openInputStream(dimSelectionJob, partitionInfoPath), new TypeReference<List<ShardSpec>>() Utils.openInputStream(dimSelectionJob, partitionInfoPath), new TypeReference<List<ShardSpec>>()
{ {
@ -228,7 +229,9 @@ public class DeterminePartitionsJob implements Jobby
} }
shardSpecs.put(segmentGranularity.getStart(), actualSpecs); shardSpecs.put(segmentGranularity.getStart(), actualSpecs);
} else {
log.info("Path[%s] didn't exist!?", partitionInfoPath);
}
} }
config.setShardSpecs(shardSpecs); config.setShardSpecs(shardSpecs);

View File

@ -65,7 +65,8 @@ public class Utils
return retVal; return retVal;
} }
public static OutputStream makePathAndOutputStream(JobContext job, Path outputPath, boolean deleteExisting) throws IOException public static OutputStream makePathAndOutputStream(JobContext job, Path outputPath, boolean deleteExisting)
throws IOException
{ {
OutputStream retVal; OutputStream retVal;
FileSystem fs = outputPath.getFileSystem(job.getConfiguration()); FileSystem fs = outputPath.getFileSystem(job.getConfiguration());
@ -73,8 +74,7 @@ public class Utils
if (fs.exists(outputPath)) { if (fs.exists(outputPath)) {
if (deleteExisting) { if (deleteExisting) {
fs.delete(outputPath, false); fs.delete(outputPath, false);
} } else {
else {
throw new ISE("outputPath[%s] must not exist.", outputPath); throw new ISE("outputPath[%s] must not exist.", outputPath);
} }
} }
@ -94,12 +94,32 @@ public class Utils
public static InputStream openInputStream(JobContext job, Path inputPath) throws IOException public static InputStream openInputStream(JobContext job, Path inputPath) throws IOException
{ {
return openInputStream(inputPath, inputPath.getFileSystem(job.getConfiguration())); return openInputStream(job, inputPath, inputPath.getFileSystem(job.getConfiguration()));
} }
public static InputStream openInputStream(Path inputPath, final FileSystem fileSystem) throws IOException public static boolean exists(JobContext job, FileSystem fs, Path inputPath) throws IOException
{ {
if (!FileOutputFormat.getCompressOutput(job)) {
return fs.exists(inputPath);
} else {
Class<? extends CompressionCodec> codecClass = FileOutputFormat.getOutputCompressorClass(job, GzipCodec.class);
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job.getConfiguration());
return fs.exists(new Path(inputPath.toString() + codec.getDefaultExtension()));
}
}
public static InputStream openInputStream(JobContext job, Path inputPath, final FileSystem fileSystem)
throws IOException
{
if (!FileOutputFormat.getCompressOutput(job)) {
return fileSystem.open(inputPath); return fileSystem.open(inputPath);
} else {
Class<? extends CompressionCodec> codecClass = FileOutputFormat.getOutputCompressorClass(job, GzipCodec.class);
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job.getConfiguration());
inputPath = new Path(inputPath.toString() + codec.getDefaultExtension());
return codec.createInputStream(fileSystem.open(inputPath));
}
} }
public static Map<String, Object> getStats(JobContext job, Path statsPath) public static Map<String, Object> getStats(JobContext job, Path statsPath)
@ -109,7 +129,9 @@ public class Utils
return jsonMapper.readValue( return jsonMapper.readValue(
fs.open(statsPath), fs.open(statsPath),
new TypeReference<Map<String, Object>>(){} new TypeReference<Map<String, Object>>()
{
}
); );
} }