Fix Index hadoop failing with index.zip is not a valid DFS filename (#11316)

* * Fix bug

* * simplify class loading

* * fix example configs for integration tests

* Small classloader cleanup

Co-authored-by: jon-wei <jon.wei@imply.io>
This commit is contained in:
zachjsh 2021-06-01 19:14:50 -04:00 committed by GitHub
parent a24817d20a
commit 27f1b6cbf3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 16 additions and 27 deletions

View File

@ -476,8 +476,8 @@ public class JobHelper
return new DataSegmentAndIndexZipFilePath( return new DataSegmentAndIndexZipFilePath(
finalSegment, finalSegment,
tmpPath.toUri().getPath(), tmpPath.toUri().toString(),
finalIndexZipFilePath.toUri().getPath() finalIndexZipFilePath.toUri().toString()
); );
} }

View File

@ -450,16 +450,11 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
List<DataSegmentAndIndexZipFilePath> dataSegmentAndIndexZipFilePaths = buildSegmentsStatus.getDataSegmentAndIndexZipFilePaths(); List<DataSegmentAndIndexZipFilePath> dataSegmentAndIndexZipFilePaths = buildSegmentsStatus.getDataSegmentAndIndexZipFilePaths();
if (dataSegmentAndIndexZipFilePaths != null) { if (dataSegmentAndIndexZipFilePaths != null) {
indexGeneratorJobSuccess = true; indexGeneratorJobSuccess = true;
try {
Thread.currentThread().setContextClassLoader(oldLoader);
renameSegmentIndexFilesJob( renameSegmentIndexFilesJob(
toolbox.getJsonMapper().writeValueAsString(indexerSchema), toolbox.getJsonMapper().writeValueAsString(indexerSchema),
toolbox.getJsonMapper().writeValueAsString(dataSegmentAndIndexZipFilePaths) toolbox.getJsonMapper().writeValueAsString(dataSegmentAndIndexZipFilePaths)
); );
}
finally {
Thread.currentThread().setContextClassLoader(loader);
}
ArrayList<DataSegment> segments = new ArrayList<>(dataSegmentAndIndexZipFilePaths.stream() ArrayList<DataSegment> segments = new ArrayList<>(dataSegmentAndIndexZipFilePaths.stream()
.map( .map(
DataSegmentAndIndexZipFilePath::getSegment) DataSegmentAndIndexZipFilePath::getSegment)
@ -545,22 +540,20 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
} }
} }
/**
* Must be called only when the hadoopy classloader is the current classloader
*/
private void renameSegmentIndexFilesJob( private void renameSegmentIndexFilesJob(
String hadoopIngestionSpecStr, String hadoopIngestionSpecStr,
String dataSegmentAndIndexZipFilePathListStr String dataSegmentAndIndexZipFilePathListStr
) )
{ {
final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader(); final ClassLoader loader = Thread.currentThread().getContextClassLoader();
try { try {
ClassLoader loader = HadoopTask.buildClassLoader( final Class<?> clazz = loader.loadClass(
getHadoopDependencyCoordinates(), "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopRenameSegmentIndexFilesRunner"
taskConfig.getDefaultHadoopCoordinates()
);
Object renameSegmentIndexFilesRunner = getForeignClassloaderObject(
"org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopRenameSegmentIndexFilesRunner",
loader
); );
Object renameSegmentIndexFilesRunner = clazz.newInstance();
String[] renameSegmentIndexFilesJobInput = new String[]{ String[] renameSegmentIndexFilesJobInput = new String[]{
hadoopIngestionSpecStr, hadoopIngestionSpecStr,
@ -573,7 +566,6 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
renameSegmentIndexFilesJobInput.getClass() renameSegmentIndexFilesJobInput.getClass()
); );
Thread.currentThread().setContextClassLoader(loader);
renameSegmentIndexFiles.invoke( renameSegmentIndexFiles.invoke(
renameSegmentIndexFilesRunner, renameSegmentIndexFilesRunner,
new Object[]{renameSegmentIndexFilesJobInput} new Object[]{renameSegmentIndexFilesJobInput}
@ -582,9 +574,6 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
catch (Exception e) { catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
finally {
Thread.currentThread().setContextClassLoader(oldLoader);
}
} }
private void indexerGeneratorCleanupJob( private void indexerGeneratorCleanupJob(

View File

@ -31,4 +31,4 @@ AWS_REGION=<OVERRIDE_THIS>
druid_extensions_loadList=["druid-s3-extensions","druid-hdfs-storage"] druid_extensions_loadList=["druid-s3-extensions","druid-hdfs-storage"]
druid.indexer.task.defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.8.5", "org.apache.hadoop:hadoop-aws:2.8.5"] druid_indexer_task_defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.8.5", "org.apache.hadoop:hadoop-aws:2.8.5"]

View File

@ -32,4 +32,4 @@ AWS_REGION=<OVERRIDE_THIS>
druid_extensions_loadList=["druid-s3-extensions","druid-hdfs-storage"] druid_extensions_loadList=["druid-s3-extensions","druid-hdfs-storage"]
druid.indexer.task.defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.8.5", "org.apache.hadoop:hadoop-aws:2.8.5"] druid_indexer_task_defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.8.5", "org.apache.hadoop:hadoop-aws:2.8.5"]