diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index bb50ceb0749..c64d08bc4ca 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -432,11 +432,12 @@ public class IndexGeneratorJob implements Jobby int attemptNumber = context.getTaskAttemptID().getId(); - FileSystem fileSystem = FileSystem.get(context.getConfiguration()); - Path indexBasePath = config.makeSegmentOutputPath(fileSystem, bucket); - Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber)); - final FileSystem infoFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration()); - final FileSystem outputFS = indexBasePath.getFileSystem(context.getConfiguration()); + final FileSystem intermediateFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration()); + final FileSystem outputFS = new Path(config.getSchema().getIOConfig().getSegmentOutputPath()).getFileSystem( + context.getConfiguration() + ); + final Path indexBasePath = config.makeSegmentOutputPath(outputFS, bucket); + final Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber)); outputFS.mkdirs(indexBasePath); @@ -502,7 +503,7 @@ public class IndexGeneratorJob implements Jobby // retry 1 minute boolean success = false; for (int i = 0; i < 6; i++) { - if (renameIndexFiles(infoFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) { + if (renameIndexFiles(intermediateFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) { log.info("Successfully renamed [%s] to [%s]", indexZipFilePath, finalIndexZipFilePath); success = true; break; @@ -532,7 +533,7 @@ public class IndexGeneratorJob implements Jobby outputFS.delete(indexZipFilePath, true); } else { outputFS.delete(finalIndexZipFilePath, true); - if (!renameIndexFiles(infoFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) { + if (!renameIndexFiles(intermediateFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) { throw new ISE( "Files [%s] and [%s] are different, but still cannot rename after retry loop", indexZipFilePath.toUri().getPath(), diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index 05edd4a238a..e22631051de 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -48,7 +48,7 @@ public class JobHelper public static void setupClasspath( HadoopDruidIndexerConfig config, - Job groupByJob + Job job ) throws IOException { @@ -59,9 +59,9 @@ public class JobHelper String[] jarFiles = classpathProperty.split(File.pathSeparator); - final Configuration conf = groupByJob.getConfiguration(); - final FileSystem fs = FileSystem.get(conf); - Path distributedClassPath = new Path(config.getWorkingPath(), "classpath"); + final Configuration conf = job.getConfiguration(); + final Path distributedClassPath = new Path(config.getWorkingPath(), "classpath"); + final FileSystem fs = distributedClassPath.getFileSystem(conf); if (fs instanceof LocalFileSystem) { return;