diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index 878f950f0c4..ca38f90e3cc 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -70,7 +70,7 @@ public class HadoopIndexTask extends AbstractTask @JsonIgnore private final HadoopDruidIndexerSchema schema; @JsonIgnore - private final String hadoopCoordinates; + private final List hadoopDependencyCoordinates; /** * @param schema is used by the HadoopDruidIndexerJob to set up the appropriate parameters @@ -86,7 +86,8 @@ public class HadoopIndexTask extends AbstractTask public HadoopIndexTask( @JsonProperty("id") String id, @JsonProperty("config") HadoopDruidIndexerSchema schema, - @JsonProperty("hadoopCoordinates") String hadoopCoordinates + @JsonProperty("hadoopCoordinates") String hadoopCoordinates, + @JsonProperty("hadoopDependencyCoordinates") List hadoopDependencyCoordinates ) { super( @@ -100,7 +101,9 @@ public class HadoopIndexTask extends AbstractTask Preconditions.checkArgument(schema.getUpdaterJobSpec() == null, "updaterJobSpec must be absent"); this.schema = schema; - this.hadoopCoordinates = (hadoopCoordinates == null ? defaultHadoopCoordinates : hadoopCoordinates); + this.hadoopDependencyCoordinates = hadoopDependencyCoordinates == null ? Arrays.asList( + hadoopCoordinates == null ? defaultHadoopCoordinates : hadoopCoordinates + ) : hadoopDependencyCoordinates; } @Override @@ -132,20 +135,16 @@ public class HadoopIndexTask extends AbstractTask } @JsonProperty - public String getHadoopCoordinates() + public List getHadoopDependencyCoordinates() { - return hadoopCoordinates; + return hadoopDependencyCoordinates; } @SuppressWarnings("unchecked") @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { - // setup Hadoop final DefaultTeslaAether aetherClient = Initialization.getAetherClient(extensionsConfig); - final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates( - aetherClient, hadoopCoordinates - ); final List extensionURLs = Lists.newArrayList(); for (String coordinate : extensionsConfig.getCoordinates()) { @@ -161,7 +160,12 @@ public class HadoopIndexTask extends AbstractTask final List driverURLs = Lists.newArrayList(); driverURLs.addAll(nonHadoopURLs); // put hadoop dependencies last to avoid jets3t & apache.httpcore version conflicts - driverURLs.addAll(Arrays.asList(((URLClassLoader) hadoopLoader).getURLs())); + for (String hadoopDependencyCoordinate : hadoopDependencyCoordinates) { + final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates( + aetherClient, hadoopDependencyCoordinate + ); + driverURLs.addAll(Arrays.asList(((URLClassLoader) hadoopLoader).getURLs())); + } final URLClassLoader loader = new URLClassLoader(driverURLs.toArray(new URL[driverURLs.size()]), null); Thread.currentThread().setContextClassLoader(loader); @@ -240,10 +244,10 @@ public class HadoopIndexTask extends AbstractTask String version = args[1]; final HadoopDruidIndexerSchema theSchema = HadoopDruidIndexerConfig.jsonMapper - .readValue( - schema, - HadoopDruidIndexerSchema.class - ); + .readValue( + schema, + HadoopDruidIndexerSchema.class + ); final HadoopDruidIndexerConfig config = new HadoopDruidIndexerConfigBuilder().withSchema(theSchema) .withVersion(version) @@ -269,10 +273,10 @@ public class HadoopIndexTask extends AbstractTask final String segmentOutputPath = args[2]; final HadoopDruidIndexerSchema theSchema = HadoopDruidIndexerConfig.jsonMapper - .readValue( - schema, - HadoopDruidIndexerSchema.class - ); + .readValue( + schema, + HadoopDruidIndexerSchema.class + ); final HadoopDruidIndexerConfig config = new HadoopDruidIndexerConfigBuilder().withSchema(theSchema) .withWorkingPath(workingPath) diff --git a/services/src/main/java/io/druid/cli/CliHadoopIndexer.java b/services/src/main/java/io/druid/cli/CliHadoopIndexer.java index d78a67e73a6..6a2f255fe1f 100644 --- a/services/src/main/java/io/druid/cli/CliHadoopIndexer.java +++ b/services/src/main/java/io/druid/cli/CliHadoopIndexer.java @@ -54,6 +54,10 @@ public class CliHadoopIndexer implements Runnable description = "The maven coordinates to the version of hadoop to run with. Defaults to org.apache.hadoop:hadoop-core:1.0.3") private String hadoopCoordinates = "org.apache.hadoop:hadoop-core:1.0.3"; + @Option(name = "hadoopDependencies", + description = "The maven coordinates to the version of hadoop to run with. Defaults to org.apache.hadoop:hadoop-core:1.0.3") + private List hadoopDependencyCoordinates = Arrays.asList("org.apache.hadoop:hadoop-core:1.0.3"); + @Inject private ExtensionsConfig extensionsConfig = null; @@ -63,9 +67,6 @@ public class CliHadoopIndexer implements Runnable { try { final DefaultTeslaAether aetherClient = Initialization.getAetherClient(extensionsConfig); - final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates( - aetherClient, hadoopCoordinates - ); final List extensionURLs = Lists.newArrayList(); for (String coordinate : extensionsConfig.getCoordinates()) { @@ -81,7 +82,12 @@ public class CliHadoopIndexer implements Runnable final List driverURLs = Lists.newArrayList(); driverURLs.addAll(nonHadoopURLs); // put hadoop dependencies last to avoid jets3t & apache.httpcore version conflicts - driverURLs.addAll(Arrays.asList(((URLClassLoader) hadoopLoader).getURLs())); + for (String coordinate : hadoopDependencyCoordinates) { + final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates( + aetherClient, coordinate + ); + driverURLs.addAll(Arrays.asList(((URLClassLoader) hadoopLoader).getURLs())); + } final URLClassLoader loader = new URLClassLoader(driverURLs.toArray(new URL[driverURLs.size()]), null); Thread.currentThread().setContextClassLoader(loader);