Merge pull request #1121 from gianm/issue-1116

Use the proper FileSystems for writing segments and caching jars. (for issue #1116)
This commit is contained in:
Fangjin Yang 2015-02-25 13:03:59 -08:00
commit 708f35151d
2 changed files with 12 additions and 11 deletions

View File

@ -449,11 +449,12 @@ public class IndexGeneratorJob implements Jobby
int attemptNumber = context.getTaskAttemptID().getId();
FileSystem fileSystem = FileSystem.get(context.getConfiguration());
Path indexBasePath = config.makeSegmentOutputPath(fileSystem, bucket);
Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber));
final FileSystem infoFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration());
final FileSystem outputFS = indexBasePath.getFileSystem(context.getConfiguration());
final FileSystem intermediateFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration());
final FileSystem outputFS = new Path(config.getSchema().getIOConfig().getSegmentOutputPath()).getFileSystem(
context.getConfiguration()
);
final Path indexBasePath = config.makeSegmentOutputPath(outputFS, bucket);
final Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber));
outputFS.mkdirs(indexBasePath);
@ -527,7 +528,7 @@ public class IndexGeneratorJob implements Jobby
// retry 1 minute
boolean success = false;
for (int i = 0; i < 6; i++) {
if (renameIndexFiles(infoFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) {
if (renameIndexFiles(intermediateFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) {
log.info("Successfully renamed [%s] to [%s]", indexZipFilePath, finalIndexZipFilePath);
success = true;
break;
@ -557,7 +558,7 @@ public class IndexGeneratorJob implements Jobby
outputFS.delete(indexZipFilePath, true);
} else {
outputFS.delete(finalIndexZipFilePath, true);
if (!renameIndexFiles(infoFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) {
if (!renameIndexFiles(intermediateFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) {
throw new ISE(
"Files [%s] and [%s] are different, but still cannot rename after retry loop",
indexZipFilePath.toUri().getPath(),

View File

@ -48,7 +48,7 @@ public class JobHelper
public static void setupClasspath(
HadoopDruidIndexerConfig config,
Job groupByJob
Job job
)
throws IOException
{
@ -59,9 +59,9 @@ public class JobHelper
String[] jarFiles = classpathProperty.split(File.pathSeparator);
final Configuration conf = groupByJob.getConfiguration();
final FileSystem fs = FileSystem.get(conf);
Path distributedClassPath = new Path(config.getWorkingPath(), "classpath");
final Configuration conf = job.getConfiguration();
final Path distributedClassPath = new Path(config.getWorkingPath(), "classpath");
final FileSystem fs = distributedClassPath.getFileSystem(conf);
if (fs instanceof LocalFileSystem) {
return;