diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java index f03c552b656..959bb548f43 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java @@ -107,6 +107,12 @@ public abstract class AbstractTask implements Task return null; } + @Override + public String getClasspathPrefix() + { + return null; + } + @Override public String toString() { diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index 69e5c667551..09f850eaeae 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -79,6 +79,8 @@ public class HadoopIndexTask extends AbstractTask private final HadoopIngestionSpec spec; @JsonIgnore private final List hadoopDependencyCoordinates; + @JsonIgnore + private final String classpathPrefix; /** * @param spec is used by the HadoopDruidIndexerJob to set up the appropriate parameters @@ -96,7 +98,8 @@ public class HadoopIndexTask extends AbstractTask @JsonProperty("spec") HadoopIngestionSpec spec, @JsonProperty("config") HadoopIngestionSpec config, // backwards compat @JsonProperty("hadoopCoordinates") String hadoopCoordinates, - @JsonProperty("hadoopDependencyCoordinates") List hadoopDependencyCoordinates + @JsonProperty("hadoopDependencyCoordinates") List hadoopDependencyCoordinates, + @JsonProperty("classpathPrefix") String classpathPrefix ) { super( @@ -123,6 +126,8 @@ public class HadoopIndexTask extends AbstractTask // Will be defaulted to something at runtime, based on taskConfig. this.hadoopDependencyCoordinates = null; } + + this.classpathPrefix = classpathPrefix; } @Override @@ -159,6 +164,13 @@ public class HadoopIndexTask extends AbstractTask return hadoopDependencyCoordinates; } + @JsonProperty + @Override + public String getClasspathPrefix() + { + return classpathPrefix; + } + @SuppressWarnings("unchecked") @Override public TaskStatus run(TaskToolbox toolbox) throws Exception diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java index f9395165f27..f554d968b01 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java @@ -98,6 +98,12 @@ public interface Task */ public QueryRunner getQueryRunner(Query query); + /** + * Returns an extra classpath that should be prepended to the default classpath when running this task. If no + * extra classpath should be prepended, this should return null or the empty string. + */ + public String getClasspathPrefix(); + /** * Execute preflight actions for a task. This can be used to acquire locks, check preconditions, and so on. The * actions must be idempotent, since this method may be executed multiple times. This typically runs on the diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java index 0c740fb9df4..7938ddff0f1 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java @@ -161,10 +161,18 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer final List command = Lists.newArrayList(); final String childHost = String.format("%s:%d", node.getHostNoPort(), childPort); + final String taskClasspath; + if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) { + taskClasspath = Joiner.on(":").join( + task.getClasspathPrefix(), + config.getClasspath() + ); + } else { + taskClasspath = config.getClasspath(); + } - command.add(config.getJavaCommand()); command.add("-cp"); - command.add(config.getClasspath()); + command.add(taskClasspath); Iterables.addAll(command, whiteSpaceSplitter.split(config.getJavaOpts())); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index fc7f13ef3ad..d565aa57e14 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -427,7 +427,8 @@ public class TaskSerdeTest null ), null, - null + null, + "blah" ); final String json = jsonMapper.writeValueAsString(task); @@ -442,5 +443,7 @@ public class TaskSerdeTest task.getSpec().getTuningConfig().getJobProperties(), task2.getSpec().getTuningConfig().getJobProperties() ); + Assert.assertEquals("blah", task.getClasspathPrefix()); + Assert.assertEquals("blah", task2.getClasspathPrefix()); } }