Fix persist of empty indexes in index generator job

This commit is contained in:
fjy 2013-08-22 10:16:43 -07:00
parent 4102dbd1e0
commit 778fd0f10e
2 changed files with 22 additions and 16 deletions

View File

@ -121,6 +121,10 @@ public class IndexMerger
final IncrementalIndex index, final Interval dataInterval, File outDir, ProgressIndicator progress
) throws IOException
{
if (index.isEmpty()) {
throw new IAE("Trying to persist an empty index!");
}
final long firstTimestamp = index.getMinTime().getMillis();
final long lastTimestamp = index.getMaxTime().getMillis();
if (!(dataInterval.contains(firstTimestamp) && dataInterval.contains(lastTimestamp))) {

View File

@ -321,7 +321,7 @@ public class IndexGeneratorJob implements Jobby
List<QueryableIndex> indexes = Lists.newArrayListWithCapacity(indexCount);
final File mergedBase;
if (toMerge.size() == 0) {
if (toMerge.size() == 0 && !index.isEmpty()) {
mergedBase = new File(baseFlushFile, "merged");
IndexMerger.persist(
index, interval, mergedBase, new IndexMerger.ProgressIndicator()
@ -334,18 +334,20 @@ public class IndexGeneratorJob implements Jobby
}
);
} else {
final File finalFile = new File(baseFlushFile, "final");
IndexMerger.persist(
index, interval, finalFile, new IndexMerger.ProgressIndicator()
{
@Override
public void progress()
if (!index.isEmpty()) {
final File finalFile = new File(baseFlushFile, "final");
IndexMerger.persist(
index, interval, finalFile, new IndexMerger.ProgressIndicator()
{
context.progress();
@Override
public void progress()
{
context.progress();
}
}
);
toMerge.add(finalFile);
}
);
toMerge.add(finalFile);
for (File file : toMerge) {
indexes.add(IndexIO.loadIndex(file));
@ -376,13 +378,13 @@ public class IndexGeneratorJob implements Jobby
int attemptNumber = context.getTaskAttemptID().getId();
FileSystem fileSystem = FileSystem.get(context.getConfiguration());
Path indexBasePath = config.makeSegmentOutputPath(fileSystem, bucket);
Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber));
final FileSystem infoFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration());
final FileSystem outputFS = indexBasePath.getFileSystem(context.getConfiguration());
FileSystem fileSystem = FileSystem.get(context.getConfiguration());
Path indexBasePath = config.makeSegmentOutputPath(fileSystem, bucket);
Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber));
final FileSystem infoFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration());
final FileSystem outputFS = indexBasePath.getFileSystem(context.getConfiguration());
outputFS.mkdirs(indexBasePath);
outputFS.mkdirs(indexBasePath);
Exception caughtException = null;
ZipOutputStream out = null;