From 778fd0f10ebf9016d2be789d8b2cbba43501b01f Mon Sep 17 00:00:00 2001 From: fjy Date: Thu, 22 Aug 2013 10:16:43 -0700 Subject: [PATCH] Fix persist of empty indexes in index generator job --- .../metamx/druid/index/v1/IndexMerger.java | 4 +++ .../druid/indexer/IndexGeneratorJob.java | 34 ++++++++++--------- 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/indexing-common/src/main/java/com/metamx/druid/index/v1/IndexMerger.java b/indexing-common/src/main/java/com/metamx/druid/index/v1/IndexMerger.java index 3cd842f998b..e662bf0224a 100644 --- a/indexing-common/src/main/java/com/metamx/druid/index/v1/IndexMerger.java +++ b/indexing-common/src/main/java/com/metamx/druid/index/v1/IndexMerger.java @@ -121,6 +121,10 @@ public class IndexMerger final IncrementalIndex index, final Interval dataInterval, File outDir, ProgressIndicator progress ) throws IOException { + if (index.isEmpty()) { + throw new IAE("Trying to persist an empty index!"); + } + final long firstTimestamp = index.getMinTime().getMillis(); final long lastTimestamp = index.getMaxTime().getMillis(); if (!(dataInterval.contains(firstTimestamp) && dataInterval.contains(lastTimestamp))) { diff --git a/indexing-hadoop/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java index 583cafcae4f..cac256589da 100644 --- a/indexing-hadoop/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java @@ -321,7 +321,7 @@ public class IndexGeneratorJob implements Jobby List indexes = Lists.newArrayListWithCapacity(indexCount); final File mergedBase; - if (toMerge.size() == 0) { + if (toMerge.size() == 0 && !index.isEmpty()) { mergedBase = new File(baseFlushFile, "merged"); IndexMerger.persist( index, interval, mergedBase, new IndexMerger.ProgressIndicator() @@ -334,18 +334,20 @@ public class IndexGeneratorJob implements Jobby } ); } else { - final File finalFile = new File(baseFlushFile, "final"); - IndexMerger.persist( - index, interval, finalFile, new IndexMerger.ProgressIndicator() - { - @Override - public void progress() + if (!index.isEmpty()) { + final File finalFile = new File(baseFlushFile, "final"); + IndexMerger.persist( + index, interval, finalFile, new IndexMerger.ProgressIndicator() { - context.progress(); + @Override + public void progress() + { + context.progress(); + } } + ); + toMerge.add(finalFile); } - ); - toMerge.add(finalFile); for (File file : toMerge) { indexes.add(IndexIO.loadIndex(file)); @@ -376,13 +378,13 @@ public class IndexGeneratorJob implements Jobby int attemptNumber = context.getTaskAttemptID().getId(); - FileSystem fileSystem = FileSystem.get(context.getConfiguration()); - Path indexBasePath = config.makeSegmentOutputPath(fileSystem, bucket); - Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber)); - final FileSystem infoFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration()); - final FileSystem outputFS = indexBasePath.getFileSystem(context.getConfiguration()); + FileSystem fileSystem = FileSystem.get(context.getConfiguration()); + Path indexBasePath = config.makeSegmentOutputPath(fileSystem, bucket); + Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber)); + final FileSystem infoFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration()); + final FileSystem outputFS = indexBasePath.getFileSystem(context.getConfiguration()); - outputFS.mkdirs(indexBasePath); + outputFS.mkdirs(indexBasePath); Exception caughtException = null; ZipOutputStream out = null;