Merge branch 'master' of github.com:metamx/druid

Conflicts:
	services/src/main/java/io/druid/cli/CliHadoopIndexer.java
This commit is contained in:
fjy 2014-03-25 10:04:14 -07:00
commit eec07ad45f
2 changed files with 28 additions and 22 deletions

View File

@ -70,7 +70,7 @@ public class HadoopIndexTask extends AbstractTask
@JsonIgnore @JsonIgnore
private final HadoopDruidIndexerSchema schema; private final HadoopDruidIndexerSchema schema;
@JsonIgnore @JsonIgnore
private final String hadoopCoordinates; private final List<String> hadoopDependencyCoordinates;
/** /**
* @param schema is used by the HadoopDruidIndexerJob to set up the appropriate parameters * @param schema is used by the HadoopDruidIndexerJob to set up the appropriate parameters
@ -86,7 +86,8 @@ public class HadoopIndexTask extends AbstractTask
public HadoopIndexTask( public HadoopIndexTask(
@JsonProperty("id") String id, @JsonProperty("id") String id,
@JsonProperty("config") HadoopDruidIndexerSchema schema, @JsonProperty("config") HadoopDruidIndexerSchema schema,
@JsonProperty("hadoopCoordinates") String hadoopCoordinates @JsonProperty("hadoopCoordinates") String hadoopCoordinates,
@JsonProperty("hadoopDependencyCoordinates") List<String> hadoopDependencyCoordinates
) )
{ {
super( super(
@ -100,7 +101,9 @@ public class HadoopIndexTask extends AbstractTask
Preconditions.checkArgument(schema.getUpdaterJobSpec() == null, "updaterJobSpec must be absent"); Preconditions.checkArgument(schema.getUpdaterJobSpec() == null, "updaterJobSpec must be absent");
this.schema = schema; this.schema = schema;
this.hadoopCoordinates = (hadoopCoordinates == null ? defaultHadoopCoordinates : hadoopCoordinates); this.hadoopDependencyCoordinates = hadoopDependencyCoordinates == null ? Arrays.<String>asList(
hadoopCoordinates == null ? defaultHadoopCoordinates : hadoopCoordinates
) : hadoopDependencyCoordinates;
} }
@Override @Override
@ -132,20 +135,16 @@ public class HadoopIndexTask extends AbstractTask
} }
@JsonProperty @JsonProperty
public String getHadoopCoordinates() public List<String> getHadoopDependencyCoordinates()
{ {
return hadoopCoordinates; return hadoopDependencyCoordinates;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public TaskStatus run(TaskToolbox toolbox) throws Exception public TaskStatus run(TaskToolbox toolbox) throws Exception
{ {
// setup Hadoop
final DefaultTeslaAether aetherClient = Initialization.getAetherClient(extensionsConfig); final DefaultTeslaAether aetherClient = Initialization.getAetherClient(extensionsConfig);
final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates(
aetherClient, hadoopCoordinates
);
final List<URL> extensionURLs = Lists.newArrayList(); final List<URL> extensionURLs = Lists.newArrayList();
for (String coordinate : extensionsConfig.getCoordinates()) { for (String coordinate : extensionsConfig.getCoordinates()) {
@ -161,7 +160,12 @@ public class HadoopIndexTask extends AbstractTask
final List<URL> driverURLs = Lists.newArrayList(); final List<URL> driverURLs = Lists.newArrayList();
driverURLs.addAll(nonHadoopURLs); driverURLs.addAll(nonHadoopURLs);
// put hadoop dependencies last to avoid jets3t & apache.httpcore version conflicts // put hadoop dependencies last to avoid jets3t & apache.httpcore version conflicts
driverURLs.addAll(Arrays.asList(((URLClassLoader) hadoopLoader).getURLs())); for (String hadoopDependencyCoordinate : hadoopDependencyCoordinates) {
final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates(
aetherClient, hadoopDependencyCoordinate
);
driverURLs.addAll(Arrays.asList(((URLClassLoader) hadoopLoader).getURLs()));
}
final URLClassLoader loader = new URLClassLoader(driverURLs.toArray(new URL[driverURLs.size()]), null); final URLClassLoader loader = new URLClassLoader(driverURLs.toArray(new URL[driverURLs.size()]), null);
Thread.currentThread().setContextClassLoader(loader); Thread.currentThread().setContextClassLoader(loader);
@ -240,10 +244,10 @@ public class HadoopIndexTask extends AbstractTask
String version = args[1]; String version = args[1];
final HadoopDruidIndexerSchema theSchema = HadoopDruidIndexerConfig.jsonMapper final HadoopDruidIndexerSchema theSchema = HadoopDruidIndexerConfig.jsonMapper
.readValue( .readValue(
schema, schema,
HadoopDruidIndexerSchema.class HadoopDruidIndexerSchema.class
); );
final HadoopDruidIndexerConfig config = final HadoopDruidIndexerConfig config =
new HadoopDruidIndexerConfigBuilder().withSchema(theSchema) new HadoopDruidIndexerConfigBuilder().withSchema(theSchema)
.withVersion(version) .withVersion(version)
@ -269,10 +273,10 @@ public class HadoopIndexTask extends AbstractTask
final String segmentOutputPath = args[2]; final String segmentOutputPath = args[2];
final HadoopDruidIndexerSchema theSchema = HadoopDruidIndexerConfig.jsonMapper final HadoopDruidIndexerSchema theSchema = HadoopDruidIndexerConfig.jsonMapper
.readValue( .readValue(
schema, schema,
HadoopDruidIndexerSchema.class HadoopDruidIndexerSchema.class
); );
final HadoopDruidIndexerConfig config = final HadoopDruidIndexerConfig config =
new HadoopDruidIndexerConfigBuilder().withSchema(theSchema) new HadoopDruidIndexerConfigBuilder().withSchema(theSchema)
.withWorkingPath(workingPath) .withWorkingPath(workingPath)

View File

@ -67,9 +67,6 @@ public class CliHadoopIndexer implements Runnable
{ {
try { try {
final DefaultTeslaAether aetherClient = Initialization.getAetherClient(extensionsConfig); final DefaultTeslaAether aetherClient = Initialization.getAetherClient(extensionsConfig);
final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates(
aetherClient, hadoopCoordinates
);
final List<URL> extensionURLs = Lists.newArrayList(); final List<URL> extensionURLs = Lists.newArrayList();
for (String coordinate : extensionsConfig.getCoordinates()) { for (String coordinate : extensionsConfig.getCoordinates()) {
@ -85,7 +82,12 @@ public class CliHadoopIndexer implements Runnable
final List<URL> driverURLs = Lists.newArrayList(); final List<URL> driverURLs = Lists.newArrayList();
driverURLs.addAll(nonHadoopURLs); driverURLs.addAll(nonHadoopURLs);
// put hadoop dependencies last to avoid jets3t & apache.httpcore version conflicts // put hadoop dependencies last to avoid jets3t & apache.httpcore version conflicts
driverURLs.addAll(Arrays.asList(((URLClassLoader) hadoopLoader).getURLs())); for (String coordinate : hadoopDependencyCoordinates) {
final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates(
aetherClient, coordinate
);
driverURLs.addAll(Arrays.asList(((URLClassLoader) hadoopLoader).getURLs()));
}
final URLClassLoader loader = new URLClassLoader(driverURLs.toArray(new URL[driverURLs.size()]), null); final URLClassLoader loader = new URLClassLoader(driverURLs.toArray(new URL[driverURLs.size()]), null);
Thread.currentThread().setContextClassLoader(loader); Thread.currentThread().setContextClassLoader(loader);