fix hadoop

This commit is contained in:
fjy 2014-03-24 18:43:31 -07:00
parent 60c204afeb
commit 5ebb2d27e3
2 changed files with 31 additions and 22 deletions

View File

@ -70,7 +70,7 @@ public class HadoopIndexTask extends AbstractTask
@JsonIgnore
private final HadoopDruidIndexerSchema schema;
@JsonIgnore
private final String hadoopCoordinates;
private final List<String> hadoopDependencyCoordinates;
/**
* @param schema is used by the HadoopDruidIndexerJob to set up the appropriate parameters
@ -86,7 +86,7 @@ public class HadoopIndexTask extends AbstractTask
public HadoopIndexTask(
@JsonProperty("id") String id,
@JsonProperty("config") HadoopDruidIndexerSchema schema,
@JsonProperty("hadoopCoordinates") String hadoopCoordinates
@JsonProperty("hadoopDependencyCoordinates") List<String> hadoopDependencyCoordinates
)
{
super(
@ -100,7 +100,9 @@ public class HadoopIndexTask extends AbstractTask
Preconditions.checkArgument(schema.getUpdaterJobSpec() == null, "updaterJobSpec must be absent");
this.schema = schema;
this.hadoopCoordinates = (hadoopCoordinates == null ? defaultHadoopCoordinates : hadoopCoordinates);
this.hadoopDependencyCoordinates = hadoopDependencyCoordinates == null ? Arrays.<String>asList(
defaultHadoopCoordinates
) : hadoopDependencyCoordinates;
}
@Override
@ -132,20 +134,16 @@ public class HadoopIndexTask extends AbstractTask
}
@JsonProperty
public String getHadoopCoordinates()
public List<String> getHadoopDependencyCoordinates()
{
return hadoopCoordinates;
return hadoopDependencyCoordinates;
}
@SuppressWarnings("unchecked")
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
// setup Hadoop
final DefaultTeslaAether aetherClient = Initialization.getAetherClient(extensionsConfig);
final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates(
aetherClient, hadoopCoordinates
);
final List<URL> extensionURLs = Lists.newArrayList();
for (String coordinate : extensionsConfig.getCoordinates()) {
@ -161,7 +159,12 @@ public class HadoopIndexTask extends AbstractTask
final List<URL> driverURLs = Lists.newArrayList();
driverURLs.addAll(nonHadoopURLs);
// put hadoop dependencies last to avoid jets3t & apache.httpcore version conflicts
driverURLs.addAll(Arrays.asList(((URLClassLoader) hadoopLoader).getURLs()));
for (String hadoopDependencyCoordinate : hadoopDependencyCoordinates) {
final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates(
aetherClient, hadoopDependencyCoordinate
);
driverURLs.addAll(Arrays.asList(((URLClassLoader) hadoopLoader).getURLs()));
}
final URLClassLoader loader = new URLClassLoader(driverURLs.toArray(new URL[driverURLs.size()]), null);
Thread.currentThread().setContextClassLoader(loader);
@ -240,10 +243,10 @@ public class HadoopIndexTask extends AbstractTask
String version = args[1];
final HadoopDruidIndexerSchema theSchema = HadoopDruidIndexerConfig.jsonMapper
.readValue(
schema,
HadoopDruidIndexerSchema.class
);
.readValue(
schema,
HadoopDruidIndexerSchema.class
);
final HadoopDruidIndexerConfig config =
new HadoopDruidIndexerConfigBuilder().withSchema(theSchema)
.withVersion(version)
@ -269,10 +272,10 @@ public class HadoopIndexTask extends AbstractTask
final String segmentOutputPath = args[2];
final HadoopDruidIndexerSchema theSchema = HadoopDruidIndexerConfig.jsonMapper
.readValue(
schema,
HadoopDruidIndexerSchema.class
);
.readValue(
schema,
HadoopDruidIndexerSchema.class
);
final HadoopDruidIndexerConfig config =
new HadoopDruidIndexerConfigBuilder().withSchema(theSchema)
.withWorkingPath(workingPath)

View File

@ -54,6 +54,10 @@ public class CliHadoopIndexer implements Runnable
description = "The maven coordinates to the version of hadoop to run with. Defaults to org.apache.hadoop:hadoop-core:1.0.3")
private String hadoopCoordinates = "org.apache.hadoop:hadoop-core:1.0.3";
@Option(name = "hadoopDependencies",
description = "The maven coordinates to the version of hadoop to run with. Defaults to org.apache.hadoop:hadoop-core:1.0.3")
private List<String> hadoopDependencyCoordinates = Arrays.<String>asList("org.apache.hadoop:hadoop-core:1.0.3");
@Inject
private ExtensionsConfig extensionsConfig = null;
@ -63,9 +67,6 @@ public class CliHadoopIndexer implements Runnable
{
try {
final DefaultTeslaAether aetherClient = Initialization.getAetherClient(extensionsConfig);
final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates(
aetherClient, hadoopCoordinates
);
final List<URL> extensionURLs = Lists.newArrayList();
for (String coordinate : extensionsConfig.getCoordinates()) {
@ -81,7 +82,12 @@ public class CliHadoopIndexer implements Runnable
final List<URL> driverURLs = Lists.newArrayList();
driverURLs.addAll(nonHadoopURLs);
// put hadoop dependencies last to avoid jets3t & apache.httpcore version conflicts
driverURLs.addAll(Arrays.asList(((URLClassLoader) hadoopLoader).getURLs()));
for (String coordinate : hadoopDependencyCoordinates) {
final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates(
aetherClient, coordinate
);
driverURLs.addAll(Arrays.asList(((URLClassLoader) hadoopLoader).getURLs()));
}
final URLClassLoader loader = new URLClassLoader(driverURLs.toArray(new URL[driverURLs.size()]), null);
Thread.currentThread().setContextClassLoader(loader);