Merge pull request #605 from metamx/default-hadoop-coordinates-config

Indexing: Make default hadoop coordinates configurable.
This commit is contained in:
fjy 2014-06-17 10:42:48 -06:00
commit f201a0e53b
2 changed files with 37 additions and 15 deletions

View File

@ -21,57 +21,71 @@ package io.druid.indexing.common.config;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import java.io.File; import java.io.File;
import java.util.List;
public class TaskConfig public class TaskConfig
{ {
@JsonProperty public static List<String> DEFAULT_DEFAULT_HADOOP_COORDINATES = ImmutableList.of(
"org.apache.hadoop:hadoop-client:2.3.0"
);
private final String baseDir; private final String baseDir;
@JsonProperty
private final File baseTaskDir; private final File baseTaskDir;
@JsonProperty
private final String hadoopWorkingPath; private final String hadoopWorkingPath;
@JsonProperty
private final int defaultRowFlushBoundary; private final int defaultRowFlushBoundary;
private final List<String> defaultHadoopCoordinates;
@JsonCreator @JsonCreator
public TaskConfig( public TaskConfig(
@JsonProperty("baseDir") String baseDir, @JsonProperty("baseDir") String baseDir,
@JsonProperty("baseTaskDir") String baseTaskDir, @JsonProperty("baseTaskDir") String baseTaskDir,
@JsonProperty("hadoopWorkingPath") String hadoopWorkingPath, @JsonProperty("hadoopWorkingPath") String hadoopWorkingPath,
@JsonProperty("defaultRowFlushBoundary") Integer defaultRowFlushBoundary @JsonProperty("defaultRowFlushBoundary") Integer defaultRowFlushBoundary,
@JsonProperty("defaultHadoopCoordinates") List<String> defaultHadoopCoordinates
) )
{ {
this.baseDir = baseDir == null ? "/tmp" : baseDir; this.baseDir = baseDir == null ? "/tmp" : baseDir;
this.baseTaskDir = new File(defaultDir(baseTaskDir, "persistent/task")); this.baseTaskDir = new File(defaultDir(baseTaskDir, "persistent/task"));
this.hadoopWorkingPath = defaultDir(hadoopWorkingPath, "druid-indexing"); this.hadoopWorkingPath = defaultDir(hadoopWorkingPath, "druid-indexing");
this.defaultRowFlushBoundary = defaultRowFlushBoundary == null ? 500000 : defaultRowFlushBoundary; this.defaultRowFlushBoundary = defaultRowFlushBoundary == null ? 500000 : defaultRowFlushBoundary;
this.defaultHadoopCoordinates = defaultHadoopCoordinates == null
? DEFAULT_DEFAULT_HADOOP_COORDINATES
: defaultHadoopCoordinates;
} }
@JsonProperty
public String getBaseDir() public String getBaseDir()
{ {
return baseDir; return baseDir;
} }
@JsonProperty
public File getBaseTaskDir() public File getBaseTaskDir()
{ {
return baseTaskDir; return baseTaskDir;
} }
@JsonProperty
public String getHadoopWorkingPath() public String getHadoopWorkingPath()
{ {
return hadoopWorkingPath; return hadoopWorkingPath;
} }
@JsonProperty
public int getDefaultRowFlushBoundary() public int getDefaultRowFlushBoundary()
{ {
return defaultRowFlushBoundary; return defaultRowFlushBoundary;
} }
@JsonProperty
public List<String> getDefaultHadoopCoordinates()
{
return defaultHadoopCoordinates;
}
private String defaultDir(String configParameter, final String defaultVal) private String defaultDir(String configParameter, final String defaultVal)
{ {
if (configParameter == null) { if (configParameter == null) {
@ -80,4 +94,4 @@ public class TaskConfig
return configParameter; return configParameter;
} }
} }

View File

@ -27,6 +27,7 @@ import com.google.api.client.util.Lists;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.common.utils.JodaUtils; import io.druid.common.utils.JodaUtils;
@ -65,8 +66,6 @@ public class HadoopIndexTask extends AbstractTask
extensionsConfig = Initialization.makeStartupInjector().getInstance(ExtensionsConfig.class); extensionsConfig = Initialization.makeStartupInjector().getInstance(ExtensionsConfig.class);
} }
public static String DEFAULT_HADOOP_COORDINATES = "org.apache.hadoop:hadoop-client:2.3.0";
private static String getTheDataSource(HadoopIngestionSpec spec, HadoopIngestionSpec config) private static String getTheDataSource(HadoopIngestionSpec spec, HadoopIngestionSpec config)
{ {
if (spec != null) { if (spec != null) {
@ -115,9 +114,14 @@ public class HadoopIndexTask extends AbstractTask
Preconditions.checkArgument(this.spec.getTuningConfig().getWorkingPath() == null, "workingPath must be absent"); Preconditions.checkArgument(this.spec.getTuningConfig().getWorkingPath() == null, "workingPath must be absent");
Preconditions.checkArgument(this.spec.getIOConfig().getMetadataUpdateSpec() == null, "updaterJobSpec must be absent"); Preconditions.checkArgument(this.spec.getIOConfig().getMetadataUpdateSpec() == null, "updaterJobSpec must be absent");
this.hadoopDependencyCoordinates = hadoopDependencyCoordinates == null ? Arrays.<String>asList( if (hadoopDependencyCoordinates != null) {
hadoopCoordinates == null ? DEFAULT_HADOOP_COORDINATES : hadoopCoordinates this.hadoopDependencyCoordinates = hadoopDependencyCoordinates;
) : hadoopDependencyCoordinates; } else if (hadoopCoordinates != null) {
this.hadoopDependencyCoordinates = ImmutableList.of(hadoopCoordinates);
} else {
// Will be defaulted to something at runtime, based on taskConfig.
this.hadoopDependencyCoordinates = null;
}
} }
@Override @Override
@ -158,6 +162,10 @@ public class HadoopIndexTask extends AbstractTask
@Override @Override
public TaskStatus run(TaskToolbox toolbox) throws Exception public TaskStatus run(TaskToolbox toolbox) throws Exception
{ {
final List<String> finalHadoopDependencyCoordinates = hadoopDependencyCoordinates != null
? hadoopDependencyCoordinates
: toolbox.getConfig().getDefaultHadoopCoordinates();
final DefaultTeslaAether aetherClient = Initialization.getAetherClient(extensionsConfig); final DefaultTeslaAether aetherClient = Initialization.getAetherClient(extensionsConfig);
final List<URL> extensionURLs = Lists.newArrayList(); final List<URL> extensionURLs = Lists.newArrayList();
@ -174,7 +182,7 @@ public class HadoopIndexTask extends AbstractTask
final List<URL> driverURLs = Lists.newArrayList(); final List<URL> driverURLs = Lists.newArrayList();
driverURLs.addAll(nonHadoopURLs); driverURLs.addAll(nonHadoopURLs);
// put hadoop dependencies last to avoid jets3t & apache.httpcore version conflicts // put hadoop dependencies last to avoid jets3t & apache.httpcore version conflicts
for (String hadoopDependencyCoordinate : hadoopDependencyCoordinates) { for (String hadoopDependencyCoordinate : finalHadoopDependencyCoordinates) {
final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates( final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates(
aetherClient, hadoopDependencyCoordinate aetherClient, hadoopDependencyCoordinate
); );