mirror of https://github.com/apache/druid.git
Allow indexing tasks to specify extra classpaths.
This could be used by Hadoop tasks to reference configs for different clusters, assuming that the possible configs have been pre-distributed to middle managers.
This commit is contained in:
parent
08b6a2677d
commit
68aeafaacd
|
@ -107,6 +107,12 @@ public abstract class AbstractTask implements Task
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getClasspathPrefix()
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString()
|
public String toString()
|
||||||
{
|
{
|
||||||
|
|
|
@ -79,6 +79,8 @@ public class HadoopIndexTask extends AbstractTask
|
||||||
private final HadoopIngestionSpec spec;
|
private final HadoopIngestionSpec spec;
|
||||||
@JsonIgnore
|
@JsonIgnore
|
||||||
private final List<String> hadoopDependencyCoordinates;
|
private final List<String> hadoopDependencyCoordinates;
|
||||||
|
@JsonIgnore
|
||||||
|
private final String classpathPrefix;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param spec is used by the HadoopDruidIndexerJob to set up the appropriate parameters
|
* @param spec is used by the HadoopDruidIndexerJob to set up the appropriate parameters
|
||||||
|
@ -96,7 +98,8 @@ public class HadoopIndexTask extends AbstractTask
|
||||||
@JsonProperty("spec") HadoopIngestionSpec spec,
|
@JsonProperty("spec") HadoopIngestionSpec spec,
|
||||||
@JsonProperty("config") HadoopIngestionSpec config, // backwards compat
|
@JsonProperty("config") HadoopIngestionSpec config, // backwards compat
|
||||||
@JsonProperty("hadoopCoordinates") String hadoopCoordinates,
|
@JsonProperty("hadoopCoordinates") String hadoopCoordinates,
|
||||||
@JsonProperty("hadoopDependencyCoordinates") List<String> hadoopDependencyCoordinates
|
@JsonProperty("hadoopDependencyCoordinates") List<String> hadoopDependencyCoordinates,
|
||||||
|
@JsonProperty("classpathPrefix") String classpathPrefix
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
super(
|
super(
|
||||||
|
@ -123,6 +126,8 @@ public class HadoopIndexTask extends AbstractTask
|
||||||
// Will be defaulted to something at runtime, based on taskConfig.
|
// Will be defaulted to something at runtime, based on taskConfig.
|
||||||
this.hadoopDependencyCoordinates = null;
|
this.hadoopDependencyCoordinates = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.classpathPrefix = classpathPrefix;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -159,6 +164,13 @@ public class HadoopIndexTask extends AbstractTask
|
||||||
return hadoopDependencyCoordinates;
|
return hadoopDependencyCoordinates;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
@Override
|
||||||
|
public String getClasspathPrefix()
|
||||||
|
{
|
||||||
|
return classpathPrefix;
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
public TaskStatus run(TaskToolbox toolbox) throws Exception
|
public TaskStatus run(TaskToolbox toolbox) throws Exception
|
||||||
|
|
|
@ -98,6 +98,12 @@ public interface Task
|
||||||
*/
|
*/
|
||||||
public <T> QueryRunner<T> getQueryRunner(Query<T> query);
|
public <T> QueryRunner<T> getQueryRunner(Query<T> query);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns an extra classpath that should be prepended to the default classpath when running this task. If no
|
||||||
|
* extra classpath should be prepended, this should return null or the empty string.
|
||||||
|
*/
|
||||||
|
public String getClasspathPrefix();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Execute preflight actions for a task. This can be used to acquire locks, check preconditions, and so on. The
|
* Execute preflight actions for a task. This can be used to acquire locks, check preconditions, and so on. The
|
||||||
* actions must be idempotent, since this method may be executed multiple times. This typically runs on the
|
* actions must be idempotent, since this method may be executed multiple times. This typically runs on the
|
||||||
|
|
|
@ -161,10 +161,18 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
|
||||||
|
|
||||||
final List<String> command = Lists.newArrayList();
|
final List<String> command = Lists.newArrayList();
|
||||||
final String childHost = String.format("%s:%d", node.getHostNoPort(), childPort);
|
final String childHost = String.format("%s:%d", node.getHostNoPort(), childPort);
|
||||||
|
final String taskClasspath;
|
||||||
|
if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) {
|
||||||
|
taskClasspath = Joiner.on(":").join(
|
||||||
|
task.getClasspathPrefix(),
|
||||||
|
config.getClasspath()
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
taskClasspath = config.getClasspath();
|
||||||
|
}
|
||||||
|
|
||||||
command.add(config.getJavaCommand());
|
|
||||||
command.add("-cp");
|
command.add("-cp");
|
||||||
command.add(config.getClasspath());
|
command.add(taskClasspath);
|
||||||
|
|
||||||
Iterables.addAll(command, whiteSpaceSplitter.split(config.getJavaOpts()));
|
Iterables.addAll(command, whiteSpaceSplitter.split(config.getJavaOpts()));
|
||||||
|
|
||||||
|
|
|
@ -427,7 +427,8 @@ public class TaskSerdeTest
|
||||||
null
|
null
|
||||||
),
|
),
|
||||||
null,
|
null,
|
||||||
null
|
null,
|
||||||
|
"blah"
|
||||||
);
|
);
|
||||||
|
|
||||||
final String json = jsonMapper.writeValueAsString(task);
|
final String json = jsonMapper.writeValueAsString(task);
|
||||||
|
@ -442,5 +443,7 @@ public class TaskSerdeTest
|
||||||
task.getSpec().getTuningConfig().getJobProperties(),
|
task.getSpec().getTuningConfig().getJobProperties(),
|
||||||
task2.getSpec().getTuningConfig().getJobProperties()
|
task2.getSpec().getTuningConfig().getJobProperties()
|
||||||
);
|
);
|
||||||
|
Assert.assertEquals("blah", task.getClasspathPrefix());
|
||||||
|
Assert.assertEquals("blah", task2.getClasspathPrefix());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue