mirror of https://github.com/apache/druid.git
Perform lazy initialization of parquet extensions module (#12827)
Historicals and middle managers crash with an `UnknownHostException` on trying to load `druid-parquet-extensions` with an ephemeral Hadoop cluster. This happens because the `fs.defaultFS` URI value cannot be resolved at start up time as the hadoop cluster may not exist at startup time. This commit fixes the error by performing initialization of the filesystem in `ParquetInputFormat.createReader()` whenever a new reader is requested.
This commit is contained in:
parent
6046a392b6
commit
cceb2e849e
|
@ -30,9 +30,7 @@ import org.apache.druid.data.input.parquet.simple.ParquetHadoopInputRowParser;
|
||||||
import org.apache.druid.data.input.parquet.simple.ParquetParseSpec;
|
import org.apache.druid.data.input.parquet.simple.ParquetParseSpec;
|
||||||
import org.apache.druid.initialization.DruidModule;
|
import org.apache.druid.initialization.DruidModule;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
@ -77,20 +75,6 @@ public class ParquetExtensionsModule implements DruidModule
|
||||||
// Set explicit CL. Otherwise it'll try to use thread context CL, which may not have all of our dependencies.
|
// Set explicit CL. Otherwise it'll try to use thread context CL, which may not have all of our dependencies.
|
||||||
conf.setClassLoader(getClass().getClassLoader());
|
conf.setClassLoader(getClass().getClassLoader());
|
||||||
|
|
||||||
// Ensure that FileSystem class level initialization happens with correct CL
|
|
||||||
// See https://github.com/apache/druid/issues/1714
|
|
||||||
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
|
|
||||||
try {
|
|
||||||
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
|
|
||||||
FileSystem.get(conf);
|
|
||||||
}
|
|
||||||
catch (IOException ex) {
|
|
||||||
throw new RuntimeException(ex);
|
|
||||||
}
|
|
||||||
finally {
|
|
||||||
Thread.currentThread().setContextClassLoader(currCtxCl);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (props != null) {
|
if (props != null) {
|
||||||
for (String propName : props.stringPropertyNames()) {
|
for (String propName : props.stringPropertyNames()) {
|
||||||
if (propName.startsWith("hadoop.")) {
|
if (propName.startsWith("hadoop.")) {
|
||||||
|
|
|
@ -29,9 +29,11 @@ import org.apache.druid.data.input.impl.NestedInputFormat;
|
||||||
import org.apache.druid.data.input.parquet.guice.Parquet;
|
import org.apache.druid.data.input.parquet.guice.Parquet;
|
||||||
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
|
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
public class ParquetInputFormat extends NestedInputFormat
|
public class ParquetInputFormat extends NestedInputFormat
|
||||||
|
@ -51,6 +53,26 @@ public class ParquetInputFormat extends NestedInputFormat
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void initialize(Configuration conf)
|
||||||
|
{
|
||||||
|
// Initializing seperately since during eager initialization, resolving
|
||||||
|
// namenode hostname throws an error if nodes are ephemeral
|
||||||
|
|
||||||
|
// Ensure that FileSystem class level initialization happens with correct CL
|
||||||
|
// See https://github.com/apache/druid/issues/1714
|
||||||
|
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
|
||||||
|
try {
|
||||||
|
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
|
||||||
|
FileSystem.get(conf);
|
||||||
|
}
|
||||||
|
catch (IOException ex) {
|
||||||
|
throw new RuntimeException(ex);
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
Thread.currentThread().setContextClassLoader(currCtxCl);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public boolean getBinaryAsString()
|
public boolean getBinaryAsString()
|
||||||
{
|
{
|
||||||
|
@ -70,6 +92,7 @@ public class ParquetInputFormat extends NestedInputFormat
|
||||||
File temporaryDirectory
|
File temporaryDirectory
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
|
initialize(conf);
|
||||||
return new ParquetReader(conf, inputRowSchema, source, temporaryDirectory, getFlattenSpec(), binaryAsString);
|
return new ParquetReader(conf, inputRowSchema, source, temporaryDirectory, getFlattenSpec(), binaryAsString);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue