Indexing: Make default hadoop coordinates configurable.

This commit is contained in:
Gian Merlino 2014-06-17 08:53:39 -07:00
parent 964f12b7d6
commit 54833fa43d
2 changed files with 37 additions and 15 deletions

View File

@ -21,57 +21,71 @@ package io.druid.indexing.common.config;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import java.io.File;
import java.util.List;
public class TaskConfig
{
@JsonProperty
public static List<String> DEFAULT_DEFAULT_HADOOP_COORDINATES = ImmutableList.of(
"org.apache.hadoop:hadoop-client:2.3.0"
);
private final String baseDir;
@JsonProperty
private final File baseTaskDir;
@JsonProperty
private final String hadoopWorkingPath;
@JsonProperty
private final int defaultRowFlushBoundary;
private final List<String> defaultHadoopCoordinates;
@JsonCreator
public TaskConfig(
@JsonProperty("baseDir") String baseDir,
@JsonProperty("baseTaskDir") String baseTaskDir,
@JsonProperty("hadoopWorkingPath") String hadoopWorkingPath,
@JsonProperty("defaultRowFlushBoundary") Integer defaultRowFlushBoundary
@JsonProperty("defaultRowFlushBoundary") Integer defaultRowFlushBoundary,
@JsonProperty("defaultHadoopCoordinates") List<String> defaultHadoopCoordinates
)
{
this.baseDir = baseDir == null ? "/tmp" : baseDir;
this.baseTaskDir = new File(defaultDir(baseTaskDir, "persistent/task"));
this.hadoopWorkingPath = defaultDir(hadoopWorkingPath, "druid-indexing");
this.defaultRowFlushBoundary = defaultRowFlushBoundary == null ? 500000 : defaultRowFlushBoundary;
this.defaultHadoopCoordinates = defaultHadoopCoordinates == null
? DEFAULT_DEFAULT_HADOOP_COORDINATES
: defaultHadoopCoordinates;
}
@JsonProperty
public String getBaseDir()
{
return baseDir;
}
@JsonProperty
public File getBaseTaskDir()
{
return baseTaskDir;
}
@JsonProperty
public String getHadoopWorkingPath()
{
return hadoopWorkingPath;
}
@JsonProperty
public int getDefaultRowFlushBoundary()
{
return defaultRowFlushBoundary;
}
@JsonProperty
public List<String> getDefaultHadoopCoordinates()
{
return defaultHadoopCoordinates;
}
private String defaultDir(String configParameter, final String defaultVal)
{
if (configParameter == null) {
@ -80,4 +94,4 @@ public class TaskConfig
return configParameter;
}
}
}

View File

@ -27,6 +27,7 @@ import com.google.api.client.util.Lists;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.metamx.common.logger.Logger;
import io.druid.common.utils.JodaUtils;
@ -65,8 +66,6 @@ public class HadoopIndexTask extends AbstractTask
extensionsConfig = Initialization.makeStartupInjector().getInstance(ExtensionsConfig.class);
}
public static String DEFAULT_HADOOP_COORDINATES = "org.apache.hadoop:hadoop-client:2.3.0";
private static String getTheDataSource(HadoopIngestionSpec spec, HadoopIngestionSpec config)
{
if (spec != null) {
@ -115,9 +114,14 @@ public class HadoopIndexTask extends AbstractTask
Preconditions.checkArgument(this.spec.getTuningConfig().getWorkingPath() == null, "workingPath must be absent");
Preconditions.checkArgument(this.spec.getIOConfig().getMetadataUpdateSpec() == null, "updaterJobSpec must be absent");
this.hadoopDependencyCoordinates = hadoopDependencyCoordinates == null ? Arrays.<String>asList(
hadoopCoordinates == null ? DEFAULT_HADOOP_COORDINATES : hadoopCoordinates
) : hadoopDependencyCoordinates;
if (hadoopDependencyCoordinates != null) {
this.hadoopDependencyCoordinates = hadoopDependencyCoordinates;
} else if (hadoopCoordinates != null) {
this.hadoopDependencyCoordinates = ImmutableList.of(hadoopCoordinates);
} else {
// Will be defaulted to something at runtime, based on taskConfig.
this.hadoopDependencyCoordinates = null;
}
}
@Override
@ -158,6 +162,10 @@ public class HadoopIndexTask extends AbstractTask
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final List<String> finalHadoopDependencyCoordinates = hadoopDependencyCoordinates != null
? hadoopDependencyCoordinates
: toolbox.getConfig().getDefaultHadoopCoordinates();
final DefaultTeslaAether aetherClient = Initialization.getAetherClient(extensionsConfig);
final List<URL> extensionURLs = Lists.newArrayList();
@ -174,7 +182,7 @@ public class HadoopIndexTask extends AbstractTask
final List<URL> driverURLs = Lists.newArrayList();
driverURLs.addAll(nonHadoopURLs);
// put hadoop dependencies last to avoid jets3t & apache.httpcore version conflicts
for (String hadoopDependencyCoordinate : hadoopDependencyCoordinates) {
for (String hadoopDependencyCoordinate : finalHadoopDependencyCoordinates) {
final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates(
aetherClient, hadoopDependencyCoordinate
);