From 728f1e8ee38290f2ca2267648fa20bdaf35d4d57 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Thu, 1 May 2014 15:01:10 +0530 Subject: [PATCH] fix exists check with compression --- .../indexer/DetermineHashedPartitionsJob.java | 4 ++-- .../druid/indexer/DeterminePartitionsJob.java | 2 +- .../src/main/java/io/druid/indexer/Utils.java | 21 +++++++++++++++---- 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index 68e17d6408a..ecf6c6118f9 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -128,7 +128,7 @@ public class DetermineHashedPartitionsJob implements Jobby if (!config.getSegmentGranularIntervals().isPresent()) { final Path intervalInfoPath = config.makeIntervalInfoPath(); fileSystem = intervalInfoPath.getFileSystem(groupByJob.getConfiguration()); - if (!fileSystem.exists(intervalInfoPath)) { + if (!Utils.exists(groupByJob, fileSystem, intervalInfoPath)) { throw new ISE("Path[%s] didn't exist!?", intervalInfoPath); } List intervals = config.jsonMapper.readValue( @@ -148,7 +148,7 @@ public class DetermineHashedPartitionsJob implements Jobby if (fileSystem == null) { fileSystem = partitionInfoPath.getFileSystem(groupByJob.getConfiguration()); } - if (fileSystem.exists(partitionInfoPath)) { + if (Utils.exists(groupByJob, fileSystem, partitionInfoPath)) { Long cardinality = config.jsonMapper.readValue( Utils.openInputStream(groupByJob, partitionInfoPath), new TypeReference() { diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java index 890a3516189..bfa7764fc6d 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java @@ -215,7 +215,7 @@ public class DeterminePartitionsJob implements Jobby if (fileSystem == null) { fileSystem = partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration()); } - if (fileSystem.exists(partitionInfoPath)) { + if (Utils.exists(dimSelectionJob, fileSystem, partitionInfoPath)) { List specs = config.jsonMapper.readValue( Utils.openInputStream(dimSelectionJob, partitionInfoPath), new TypeReference>() { diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/Utils.java b/indexing-hadoop/src/main/java/io/druid/indexer/Utils.java index 162225c31ee..fef946c6333 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/Utils.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/Utils.java @@ -65,7 +65,8 @@ public class Utils return retVal; } - public static OutputStream makePathAndOutputStream(JobContext job, Path outputPath, boolean deleteExisting) throws IOException + public static OutputStream makePathAndOutputStream(JobContext job, Path outputPath, boolean deleteExisting) + throws IOException { OutputStream retVal; FileSystem fs = outputPath.getFileSystem(job.getConfiguration()); @@ -73,8 +74,7 @@ public class Utils if (fs.exists(outputPath)) { if (deleteExisting) { fs.delete(outputPath, false); - } - else { + } else { throw new ISE("outputPath[%s] must not exist.", outputPath); } } @@ -97,6 +97,17 @@ public class Utils return openInputStream(inputPath, inputPath.getFileSystem(job.getConfiguration())); } + public static boolean exists(JobContext job, FileSystem fs, Path inputPath) throws IOException + { + if (!FileOutputFormat.getCompressOutput(job)) { + return fs.exists(inputPath); + } else { + Class codecClass = FileOutputFormat.getOutputCompressorClass(job, GzipCodec.class); + CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job.getConfiguration()); + return fs.exists(new Path(inputPath.toString() + codec.getDefaultExtension())); + } + } + public static InputStream openInputStream(Path inputPath, final FileSystem fileSystem) throws IOException { return fileSystem.open(inputPath); @@ -109,7 +120,9 @@ public class Utils return jsonMapper.readValue( fs.open(statsPath), - new TypeReference>(){} + new TypeReference>() + { + } ); }