From 5ebb2d27e36cde7fa4cad8a409b25c2a8611a952 Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 24 Mar 2014 18:43:31 -0700 Subject: [PATCH 1/2] fix hadoop --- .../indexing/common/task/HadoopIndexTask.java | 39 ++++++++++--------- .../java/io/druid/cli/CliHadoopIndexer.java | 14 +++++-- 2 files changed, 31 insertions(+), 22 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index 878f950f0c4..c9e971bdeeb 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -70,7 +70,7 @@ public class HadoopIndexTask extends AbstractTask @JsonIgnore private final HadoopDruidIndexerSchema schema; @JsonIgnore - private final String hadoopCoordinates; + private final List hadoopDependencyCoordinates; /** * @param schema is used by the HadoopDruidIndexerJob to set up the appropriate parameters @@ -86,7 +86,7 @@ public class HadoopIndexTask extends AbstractTask public HadoopIndexTask( @JsonProperty("id") String id, @JsonProperty("config") HadoopDruidIndexerSchema schema, - @JsonProperty("hadoopCoordinates") String hadoopCoordinates + @JsonProperty("hadoopDependencyCoordinates") List hadoopDependencyCoordinates ) { super( @@ -100,7 +100,9 @@ public class HadoopIndexTask extends AbstractTask Preconditions.checkArgument(schema.getUpdaterJobSpec() == null, "updaterJobSpec must be absent"); this.schema = schema; - this.hadoopCoordinates = (hadoopCoordinates == null ? defaultHadoopCoordinates : hadoopCoordinates); + this.hadoopDependencyCoordinates = hadoopDependencyCoordinates == null ? Arrays.asList( + defaultHadoopCoordinates + ) : hadoopDependencyCoordinates; } @Override @@ -132,20 +134,16 @@ public class HadoopIndexTask extends AbstractTask } @JsonProperty - public String getHadoopCoordinates() + public List getHadoopDependencyCoordinates() { - return hadoopCoordinates; + return hadoopDependencyCoordinates; } @SuppressWarnings("unchecked") @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { - // setup Hadoop final DefaultTeslaAether aetherClient = Initialization.getAetherClient(extensionsConfig); - final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates( - aetherClient, hadoopCoordinates - ); final List extensionURLs = Lists.newArrayList(); for (String coordinate : extensionsConfig.getCoordinates()) { @@ -161,7 +159,12 @@ public class HadoopIndexTask extends AbstractTask final List driverURLs = Lists.newArrayList(); driverURLs.addAll(nonHadoopURLs); // put hadoop dependencies last to avoid jets3t & apache.httpcore version conflicts - driverURLs.addAll(Arrays.asList(((URLClassLoader) hadoopLoader).getURLs())); + for (String hadoopDependencyCoordinate : hadoopDependencyCoordinates) { + final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates( + aetherClient, hadoopDependencyCoordinate + ); + driverURLs.addAll(Arrays.asList(((URLClassLoader) hadoopLoader).getURLs())); + } final URLClassLoader loader = new URLClassLoader(driverURLs.toArray(new URL[driverURLs.size()]), null); Thread.currentThread().setContextClassLoader(loader); @@ -240,10 +243,10 @@ public class HadoopIndexTask extends AbstractTask String version = args[1]; final HadoopDruidIndexerSchema theSchema = HadoopDruidIndexerConfig.jsonMapper - .readValue( - schema, - HadoopDruidIndexerSchema.class - ); + .readValue( + schema, + HadoopDruidIndexerSchema.class + ); final HadoopDruidIndexerConfig config = new HadoopDruidIndexerConfigBuilder().withSchema(theSchema) .withVersion(version) @@ -269,10 +272,10 @@ public class HadoopIndexTask extends AbstractTask final String segmentOutputPath = args[2]; final HadoopDruidIndexerSchema theSchema = HadoopDruidIndexerConfig.jsonMapper - .readValue( - schema, - HadoopDruidIndexerSchema.class - ); + .readValue( + schema, + HadoopDruidIndexerSchema.class + ); final HadoopDruidIndexerConfig config = new HadoopDruidIndexerConfigBuilder().withSchema(theSchema) .withWorkingPath(workingPath) diff --git a/services/src/main/java/io/druid/cli/CliHadoopIndexer.java b/services/src/main/java/io/druid/cli/CliHadoopIndexer.java index d78a67e73a6..6a2f255fe1f 100644 --- a/services/src/main/java/io/druid/cli/CliHadoopIndexer.java +++ b/services/src/main/java/io/druid/cli/CliHadoopIndexer.java @@ -54,6 +54,10 @@ public class CliHadoopIndexer implements Runnable description = "The maven coordinates to the version of hadoop to run with. Defaults to org.apache.hadoop:hadoop-core:1.0.3") private String hadoopCoordinates = "org.apache.hadoop:hadoop-core:1.0.3"; + @Option(name = "hadoopDependencies", + description = "The maven coordinates to the version of hadoop to run with. Defaults to org.apache.hadoop:hadoop-core:1.0.3") + private List hadoopDependencyCoordinates = Arrays.asList("org.apache.hadoop:hadoop-core:1.0.3"); + @Inject private ExtensionsConfig extensionsConfig = null; @@ -63,9 +67,6 @@ public class CliHadoopIndexer implements Runnable { try { final DefaultTeslaAether aetherClient = Initialization.getAetherClient(extensionsConfig); - final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates( - aetherClient, hadoopCoordinates - ); final List extensionURLs = Lists.newArrayList(); for (String coordinate : extensionsConfig.getCoordinates()) { @@ -81,7 +82,12 @@ public class CliHadoopIndexer implements Runnable final List driverURLs = Lists.newArrayList(); driverURLs.addAll(nonHadoopURLs); // put hadoop dependencies last to avoid jets3t & apache.httpcore version conflicts - driverURLs.addAll(Arrays.asList(((URLClassLoader) hadoopLoader).getURLs())); + for (String coordinate : hadoopDependencyCoordinates) { + final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates( + aetherClient, coordinate + ); + driverURLs.addAll(Arrays.asList(((URLClassLoader) hadoopLoader).getURLs())); + } final URLClassLoader loader = new URLClassLoader(driverURLs.toArray(new URL[driverURLs.size()]), null); Thread.currentThread().setContextClassLoader(loader); From 771aa2ae68e232447a0d8ace2fa1a5a0fcb32cba Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 25 Mar 2014 09:23:48 -0700 Subject: [PATCH 2/2] backwards compat --- .../java/io/druid/indexing/common/task/HadoopIndexTask.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index c9e971bdeeb..ca38f90e3cc 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -86,6 +86,7 @@ public class HadoopIndexTask extends AbstractTask public HadoopIndexTask( @JsonProperty("id") String id, @JsonProperty("config") HadoopDruidIndexerSchema schema, + @JsonProperty("hadoopCoordinates") String hadoopCoordinates, @JsonProperty("hadoopDependencyCoordinates") List hadoopDependencyCoordinates ) { @@ -101,7 +102,7 @@ public class HadoopIndexTask extends AbstractTask this.schema = schema; this.hadoopDependencyCoordinates = hadoopDependencyCoordinates == null ? Arrays.asList( - defaultHadoopCoordinates + hadoopCoordinates == null ? defaultHadoopCoordinates : hadoopCoordinates ) : hadoopDependencyCoordinates; }