From 01e84f10b753a2a49ec8b968beb293e431ebbc38 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Thu, 1 May 2014 14:35:09 +0530 Subject: [PATCH 1/3] add the checks again. removing these checks breaks when there is no data for any interval --- .../indexer/DetermineHashedPartitionsJob.java | 11 +++++-- .../druid/indexer/DeterminePartitionsJob.java | 29 ++++++++++--------- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index 530d155460d..68e17d6408a 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -128,6 +128,9 @@ public class DetermineHashedPartitionsJob implements Jobby if (!config.getSegmentGranularIntervals().isPresent()) { final Path intervalInfoPath = config.makeIntervalInfoPath(); fileSystem = intervalInfoPath.getFileSystem(groupByJob.getConfiguration()); + if (!fileSystem.exists(intervalInfoPath)) { + throw new ISE("Path[%s] didn't exist!?", intervalInfoPath); + } List intervals = config.jsonMapper.readValue( Utils.openInputStream(groupByJob, intervalInfoPath), new TypeReference>() { @@ -145,7 +148,8 @@ public class DetermineHashedPartitionsJob implements Jobby if (fileSystem == null) { fileSystem = partitionInfoPath.getFileSystem(groupByJob.getConfiguration()); } - final Long cardinality = config.jsonMapper.readValue( + if (fileSystem.exists(partitionInfoPath)) { + Long cardinality = config.jsonMapper.readValue( Utils.openInputStream(groupByJob, partitionInfoPath), new TypeReference() { } @@ -162,8 +166,11 @@ public class DetermineHashedPartitionsJob implements Jobby } } - shardSpecs.put(bucket, actualSpecs); + shardSpecs.put(bucket, actualSpecs); + } else { + log.info("Path[%s] didn't exist!?", partitionInfoPath); + } } config.setShardSpecs(shardSpecs); log.info( diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java index ddcb691ef09..890a3516189 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java @@ -215,20 +215,23 @@ public class DeterminePartitionsJob implements Jobby if (fileSystem == null) { fileSystem = partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration()); } - List specs = config.jsonMapper.readValue( - Utils.openInputStream(dimSelectionJob, partitionInfoPath), new TypeReference>() - { + if (fileSystem.exists(partitionInfoPath)) { + List specs = config.jsonMapper.readValue( + Utils.openInputStream(dimSelectionJob, partitionInfoPath), new TypeReference>() + { + } + ); + + List actualSpecs = Lists.newArrayListWithExpectedSize(specs.size()); + for (int i = 0; i < specs.size(); ++i) { + actualSpecs.add(new HadoopyShardSpec(specs.get(i), shardCount++)); + log.info("DateTime[%s], partition[%d], spec[%s]", segmentGranularity, i, actualSpecs.get(i)); + } + + shardSpecs.put(segmentGranularity.getStart(), actualSpecs); + } else { + log.info("Path[%s] didn't exist!?", partitionInfoPath); } - ); - - List actualSpecs = Lists.newArrayListWithExpectedSize(specs.size()); - for (int i = 0; i < specs.size(); ++i) { - actualSpecs.add(new HadoopyShardSpec(specs.get(i), shardCount++)); - log.info("DateTime[%s], partition[%d], spec[%s]", segmentGranularity, i, actualSpecs.get(i)); - } - - shardSpecs.put(segmentGranularity.getStart(), actualSpecs); - } config.setShardSpecs(shardSpecs); From 728f1e8ee38290f2ca2267648fa20bdaf35d4d57 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Thu, 1 May 2014 15:01:10 +0530 Subject: [PATCH 2/3] fix exists check with compression --- .../indexer/DetermineHashedPartitionsJob.java | 4 ++-- .../druid/indexer/DeterminePartitionsJob.java | 2 +- .../src/main/java/io/druid/indexer/Utils.java | 21 +++++++++++++++---- 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index 68e17d6408a..ecf6c6118f9 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -128,7 +128,7 @@ public class DetermineHashedPartitionsJob implements Jobby if (!config.getSegmentGranularIntervals().isPresent()) { final Path intervalInfoPath = config.makeIntervalInfoPath(); fileSystem = intervalInfoPath.getFileSystem(groupByJob.getConfiguration()); - if (!fileSystem.exists(intervalInfoPath)) { + if (!Utils.exists(groupByJob, fileSystem, intervalInfoPath)) { throw new ISE("Path[%s] didn't exist!?", intervalInfoPath); } List intervals = config.jsonMapper.readValue( @@ -148,7 +148,7 @@ public class DetermineHashedPartitionsJob implements Jobby if (fileSystem == null) { fileSystem = partitionInfoPath.getFileSystem(groupByJob.getConfiguration()); } - if (fileSystem.exists(partitionInfoPath)) { + if (Utils.exists(groupByJob, fileSystem, partitionInfoPath)) { Long cardinality = config.jsonMapper.readValue( Utils.openInputStream(groupByJob, partitionInfoPath), new TypeReference() { diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java index 890a3516189..bfa7764fc6d 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java @@ -215,7 +215,7 @@ public class DeterminePartitionsJob implements Jobby if (fileSystem == null) { fileSystem = partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration()); } - if (fileSystem.exists(partitionInfoPath)) { + if (Utils.exists(dimSelectionJob, fileSystem, partitionInfoPath)) { List specs = config.jsonMapper.readValue( Utils.openInputStream(dimSelectionJob, partitionInfoPath), new TypeReference>() { diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/Utils.java b/indexing-hadoop/src/main/java/io/druid/indexer/Utils.java index 162225c31ee..fef946c6333 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/Utils.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/Utils.java @@ -65,7 +65,8 @@ public class Utils return retVal; } - public static OutputStream makePathAndOutputStream(JobContext job, Path outputPath, boolean deleteExisting) throws IOException + public static OutputStream makePathAndOutputStream(JobContext job, Path outputPath, boolean deleteExisting) + throws IOException { OutputStream retVal; FileSystem fs = outputPath.getFileSystem(job.getConfiguration()); @@ -73,8 +74,7 @@ public class Utils if (fs.exists(outputPath)) { if (deleteExisting) { fs.delete(outputPath, false); - } - else { + } else { throw new ISE("outputPath[%s] must not exist.", outputPath); } } @@ -97,6 +97,17 @@ public class Utils return openInputStream(inputPath, inputPath.getFileSystem(job.getConfiguration())); } + public static boolean exists(JobContext job, FileSystem fs, Path inputPath) throws IOException + { + if (!FileOutputFormat.getCompressOutput(job)) { + return fs.exists(inputPath); + } else { + Class codecClass = FileOutputFormat.getOutputCompressorClass(job, GzipCodec.class); + CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job.getConfiguration()); + return fs.exists(new Path(inputPath.toString() + codec.getDefaultExtension())); + } + } + public static InputStream openInputStream(Path inputPath, final FileSystem fileSystem) throws IOException { return fileSystem.open(inputPath); @@ -109,7 +120,9 @@ public class Utils return jsonMapper.readValue( fs.open(statsPath), - new TypeReference>(){} + new TypeReference>() + { + } ); } From 5137031304aa5bd74cf989daac3285b50b40c06b Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Thu, 1 May 2014 15:20:47 +0530 Subject: [PATCH 3/3] use same logic for compression Use same logic for compression across creating files, reading from files, and checking file existence --- .../src/main/java/io/druid/indexer/Utils.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/Utils.java b/indexing-hadoop/src/main/java/io/druid/indexer/Utils.java index fef946c6333..b97423974bc 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/Utils.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/Utils.java @@ -94,7 +94,7 @@ public class Utils public static InputStream openInputStream(JobContext job, Path inputPath) throws IOException { - return openInputStream(inputPath, inputPath.getFileSystem(job.getConfiguration())); + return openInputStream(job, inputPath, inputPath.getFileSystem(job.getConfiguration())); } public static boolean exists(JobContext job, FileSystem fs, Path inputPath) throws IOException @@ -108,9 +108,18 @@ public class Utils } } - public static InputStream openInputStream(Path inputPath, final FileSystem fileSystem) throws IOException + public static InputStream openInputStream(JobContext job, Path inputPath, final FileSystem fileSystem) + throws IOException { - return fileSystem.open(inputPath); + if (!FileOutputFormat.getCompressOutput(job)) { + return fileSystem.open(inputPath); + } else { + Class codecClass = FileOutputFormat.getOutputCompressorClass(job, GzipCodec.class); + CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job.getConfiguration()); + inputPath = new Path(inputPath.toString() + codec.getDefaultExtension()); + + return codec.createInputStream(fileSystem.open(inputPath)); + } } public static Map getStats(JobContext job, Path statsPath)