Use the proper FileSystems for writing segments and caching jars. (for issue #1116)

This commit is contained in:
Gian Merlino 2015-02-12 16:19:59 -08:00
parent 040e771da2
commit fd5a7d1f08
2 changed files with 12 additions and 11 deletions

View File

@ -432,11 +432,12 @@ public class IndexGeneratorJob implements Jobby
int attemptNumber = context.getTaskAttemptID().getId(); int attemptNumber = context.getTaskAttemptID().getId();
FileSystem fileSystem = FileSystem.get(context.getConfiguration()); final FileSystem intermediateFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration());
Path indexBasePath = config.makeSegmentOutputPath(fileSystem, bucket); final FileSystem outputFS = new Path(config.getSchema().getIOConfig().getSegmentOutputPath()).getFileSystem(
Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber)); context.getConfiguration()
final FileSystem infoFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration()); );
final FileSystem outputFS = indexBasePath.getFileSystem(context.getConfiguration()); final Path indexBasePath = config.makeSegmentOutputPath(outputFS, bucket);
final Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber));
outputFS.mkdirs(indexBasePath); outputFS.mkdirs(indexBasePath);
@ -502,7 +503,7 @@ public class IndexGeneratorJob implements Jobby
// retry 1 minute // retry 1 minute
boolean success = false; boolean success = false;
for (int i = 0; i < 6; i++) { for (int i = 0; i < 6; i++) {
if (renameIndexFiles(infoFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) { if (renameIndexFiles(intermediateFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) {
log.info("Successfully renamed [%s] to [%s]", indexZipFilePath, finalIndexZipFilePath); log.info("Successfully renamed [%s] to [%s]", indexZipFilePath, finalIndexZipFilePath);
success = true; success = true;
break; break;
@ -532,7 +533,7 @@ public class IndexGeneratorJob implements Jobby
outputFS.delete(indexZipFilePath, true); outputFS.delete(indexZipFilePath, true);
} else { } else {
outputFS.delete(finalIndexZipFilePath, true); outputFS.delete(finalIndexZipFilePath, true);
if (!renameIndexFiles(infoFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) { if (!renameIndexFiles(intermediateFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) {
throw new ISE( throw new ISE(
"Files [%s] and [%s] are different, but still cannot rename after retry loop", "Files [%s] and [%s] are different, but still cannot rename after retry loop",
indexZipFilePath.toUri().getPath(), indexZipFilePath.toUri().getPath(),

View File

@ -48,7 +48,7 @@ public class JobHelper
public static void setupClasspath( public static void setupClasspath(
HadoopDruidIndexerConfig config, HadoopDruidIndexerConfig config,
Job groupByJob Job job
) )
throws IOException throws IOException
{ {
@ -59,9 +59,9 @@ public class JobHelper
String[] jarFiles = classpathProperty.split(File.pathSeparator); String[] jarFiles = classpathProperty.split(File.pathSeparator);
final Configuration conf = groupByJob.getConfiguration(); final Configuration conf = job.getConfiguration();
final FileSystem fs = FileSystem.get(conf); final Path distributedClassPath = new Path(config.getWorkingPath(), "classpath");
Path distributedClassPath = new Path(config.getWorkingPath(), "classpath"); final FileSystem fs = distributedClassPath.getFileSystem(conf);
if (fs instanceof LocalFileSystem) { if (fs instanceof LocalFileSystem) {
return; return;