Merge pull request #439 from metamx/fix-hadoop

Allow multiple hadoop coordinates in preparation of Hadoop 2
This commit is contained in:
fjy 2014-03-25 10:50:24 -06:00
commit 0dc864d526
2 changed files with 32 additions and 22 deletions

View File

@ -70,7 +70,7 @@ public class HadoopIndexTask extends AbstractTask
@JsonIgnore @JsonIgnore
private final HadoopDruidIndexerSchema schema; private final HadoopDruidIndexerSchema schema;
@JsonIgnore @JsonIgnore
private final String hadoopCoordinates; private final List<String> hadoopDependencyCoordinates;
/** /**
* @param schema is used by the HadoopDruidIndexerJob to set up the appropriate parameters * @param schema is used by the HadoopDruidIndexerJob to set up the appropriate parameters
@ -86,7 +86,8 @@ public class HadoopIndexTask extends AbstractTask
public HadoopIndexTask( public HadoopIndexTask(
@JsonProperty("id") String id, @JsonProperty("id") String id,
@JsonProperty("config") HadoopDruidIndexerSchema schema, @JsonProperty("config") HadoopDruidIndexerSchema schema,
@JsonProperty("hadoopCoordinates") String hadoopCoordinates @JsonProperty("hadoopCoordinates") String hadoopCoordinates,
@JsonProperty("hadoopDependencyCoordinates") List<String> hadoopDependencyCoordinates
) )
{ {
super( super(
@ -100,7 +101,9 @@ public class HadoopIndexTask extends AbstractTask
Preconditions.checkArgument(schema.getUpdaterJobSpec() == null, "updaterJobSpec must be absent"); Preconditions.checkArgument(schema.getUpdaterJobSpec() == null, "updaterJobSpec must be absent");
this.schema = schema; this.schema = schema;
this.hadoopCoordinates = (hadoopCoordinates == null ? defaultHadoopCoordinates : hadoopCoordinates); this.hadoopDependencyCoordinates = hadoopDependencyCoordinates == null ? Arrays.<String>asList(
hadoopCoordinates == null ? defaultHadoopCoordinates : hadoopCoordinates
) : hadoopDependencyCoordinates;
} }
@Override @Override
@ -132,20 +135,16 @@ public class HadoopIndexTask extends AbstractTask
} }
@JsonProperty @JsonProperty
public String getHadoopCoordinates() public List<String> getHadoopDependencyCoordinates()
{ {
return hadoopCoordinates; return hadoopDependencyCoordinates;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public TaskStatus run(TaskToolbox toolbox) throws Exception public TaskStatus run(TaskToolbox toolbox) throws Exception
{ {
// setup Hadoop
final DefaultTeslaAether aetherClient = Initialization.getAetherClient(extensionsConfig); final DefaultTeslaAether aetherClient = Initialization.getAetherClient(extensionsConfig);
final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates(
aetherClient, hadoopCoordinates
);
final List<URL> extensionURLs = Lists.newArrayList(); final List<URL> extensionURLs = Lists.newArrayList();
for (String coordinate : extensionsConfig.getCoordinates()) { for (String coordinate : extensionsConfig.getCoordinates()) {
@ -161,7 +160,12 @@ public class HadoopIndexTask extends AbstractTask
final List<URL> driverURLs = Lists.newArrayList(); final List<URL> driverURLs = Lists.newArrayList();
driverURLs.addAll(nonHadoopURLs); driverURLs.addAll(nonHadoopURLs);
// put hadoop dependencies last to avoid jets3t & apache.httpcore version conflicts // put hadoop dependencies last to avoid jets3t & apache.httpcore version conflicts
for (String hadoopDependencyCoordinate : hadoopDependencyCoordinates) {
final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates(
aetherClient, hadoopDependencyCoordinate
);
driverURLs.addAll(Arrays.asList(((URLClassLoader) hadoopLoader).getURLs())); driverURLs.addAll(Arrays.asList(((URLClassLoader) hadoopLoader).getURLs()));
}
final URLClassLoader loader = new URLClassLoader(driverURLs.toArray(new URL[driverURLs.size()]), null); final URLClassLoader loader = new URLClassLoader(driverURLs.toArray(new URL[driverURLs.size()]), null);
Thread.currentThread().setContextClassLoader(loader); Thread.currentThread().setContextClassLoader(loader);

View File

@ -54,6 +54,10 @@ public class CliHadoopIndexer implements Runnable
description = "The maven coordinates to the version of hadoop to run with. Defaults to org.apache.hadoop:hadoop-core:1.0.3") description = "The maven coordinates to the version of hadoop to run with. Defaults to org.apache.hadoop:hadoop-core:1.0.3")
private String hadoopCoordinates = "org.apache.hadoop:hadoop-core:1.0.3"; private String hadoopCoordinates = "org.apache.hadoop:hadoop-core:1.0.3";
@Option(name = "hadoopDependencies",
description = "The maven coordinates to the version of hadoop to run with. Defaults to org.apache.hadoop:hadoop-core:1.0.3")
private List<String> hadoopDependencyCoordinates = Arrays.<String>asList("org.apache.hadoop:hadoop-core:1.0.3");
@Inject @Inject
private ExtensionsConfig extensionsConfig = null; private ExtensionsConfig extensionsConfig = null;
@ -63,9 +67,6 @@ public class CliHadoopIndexer implements Runnable
{ {
try { try {
final DefaultTeslaAether aetherClient = Initialization.getAetherClient(extensionsConfig); final DefaultTeslaAether aetherClient = Initialization.getAetherClient(extensionsConfig);
final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates(
aetherClient, hadoopCoordinates
);
final List<URL> extensionURLs = Lists.newArrayList(); final List<URL> extensionURLs = Lists.newArrayList();
for (String coordinate : extensionsConfig.getCoordinates()) { for (String coordinate : extensionsConfig.getCoordinates()) {
@ -81,7 +82,12 @@ public class CliHadoopIndexer implements Runnable
final List<URL> driverURLs = Lists.newArrayList(); final List<URL> driverURLs = Lists.newArrayList();
driverURLs.addAll(nonHadoopURLs); driverURLs.addAll(nonHadoopURLs);
// put hadoop dependencies last to avoid jets3t & apache.httpcore version conflicts // put hadoop dependencies last to avoid jets3t & apache.httpcore version conflicts
for (String coordinate : hadoopDependencyCoordinates) {
final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates(
aetherClient, coordinate
);
driverURLs.addAll(Arrays.asList(((URLClassLoader) hadoopLoader).getURLs())); driverURLs.addAll(Arrays.asList(((URLClassLoader) hadoopLoader).getURLs()));
}
final URLClassLoader loader = new URLClassLoader(driverURLs.toArray(new URL[driverURLs.size()]), null); final URLClassLoader loader = new URLClassLoader(driverURLs.toArray(new URL[driverURLs.size()]), null);
Thread.currentThread().setContextClassLoader(loader); Thread.currentThread().setContextClassLoader(loader);