mirror of https://github.com/apache/druid.git
Merge pull request #2722 from himanshug/fix_hadoop_jar_upload
config to explicitly specify classpath for hadoop container during hadoop ingestion
This commit is contained in:
commit
9cb197adec
|
@ -23,6 +23,7 @@ Many of Druid's external dependencies can be plugged in as modules. Extensions c
|
|||
|--------|-----------|-------|
|
||||
|`druid.extensions.directory`|The root extension directory where user can put extensions related files. Druid will load extensions stored under this directory.|`extensions` (This is a relative path to Druid's working directory)|
|
||||
|`druid.extensions.hadoopDependenciesDir`|The root hadoop dependencies directory where user can put hadoop related dependencies files. Druid will load the dependencies based on the hadoop coordinate specified in the hadoop index task.|`hadoop-dependencies` (This is a relative path to Druid's working directory|
|
||||
|`druid.extensions.hadoopContainerDruidClasspath`|Hadoop Indexing launches hadoop jobs and this configuration provides way to explicitly set the user classpath for the hadoop job. By default this is computed automatically by druid based on the druid process classpath and set of extensions. However, sometimes you might want to be explicit to resolve dependency conflicts between druid and hadoop.|null|
|
||||
|`druid.extensions.loadList`|A JSON array of extensions to load from extension directories by Druid. If it is not specified, its value will be `null` and Druid will load all the extensions under `druid.extensions.directory`. If its value is empty list `[]`, then no extensions will be loaded at all.|null|
|
||||
|`druid.extensions.searchCurrentClassloader`|This is a boolean flag that determines if Druid will search the main classloader for extensions. It defaults to true but can be turned off if you have reason to not automatically add all modules on the classpath.|true|
|
||||
|
||||
|
|
|
@ -82,6 +82,8 @@ instance of a Druid [overlord](../design/indexing-service.html). A sample task i
|
|||
|hadoopDependencyCoordinates|A JSON array of Hadoop dependency coordinates that Druid will use, this property will override the default Hadoop coordinates. Once specified, Druid will look for those Hadoop dependencies from the location specified by `druid.extensions.hadoopDependenciesDir`|no|
|
||||
|classpathPrefix|Classpath that will be pre-appended for the peon process.|no|
|
||||
|
||||
also note that, druid automatically computes the classpath for hadoop job containers that run in hadoop cluster. But, in case of conflicts between hadoop and druid's dependencies, you can manually specify the classpath by setting `druid.extensions.hadoopContainerDruidClasspath` property. See the extensions config in [base druid configuration](../configuration/index.html).
|
||||
|
||||
### DataSchema
|
||||
|
||||
This field is required. See [Ingestion](../ingestion/index.html).
|
||||
|
|
|
@ -157,7 +157,21 @@ public abstract class HadoopTask extends AbstractTask
|
|||
null
|
||||
);
|
||||
|
||||
System.setProperty("druid.hadoop.internal.classpath", Joiner.on(File.pathSeparator).join(jobURLs));
|
||||
final String hadoopContainerDruidClasspathJars;
|
||||
if (extensionsConfig.getHadoopContainerDruidClasspath() == null) {
|
||||
hadoopContainerDruidClasspathJars = Joiner.on(File.pathSeparator).join(jobURLs);
|
||||
|
||||
} else {
|
||||
hadoopContainerDruidClasspathJars =
|
||||
Joiner.on(File.pathSeparator)
|
||||
.join(
|
||||
Initialization.getURLsForClasspath(extensionsConfig.getHadoopContainerDruidClasspath())
|
||||
);
|
||||
}
|
||||
|
||||
log.info("Hadoop Container Druid Classpath is set to [%s]", hadoopContainerDruidClasspathJars);
|
||||
System.setProperty("druid.hadoop.internal.classpath", hadoopContainerDruidClasspathJars);
|
||||
|
||||
return classLoader;
|
||||
}
|
||||
|
||||
|
|
|
@ -38,6 +38,9 @@ public class ExtensionsConfig
|
|||
@JsonProperty
|
||||
private String hadoopDependenciesDir = "hadoop-dependencies";
|
||||
|
||||
@JsonProperty
|
||||
private String hadoopContainerDruidClasspath = null;
|
||||
|
||||
@JsonProperty
|
||||
private List<String> loadList;
|
||||
|
||||
|
@ -56,6 +59,11 @@ public class ExtensionsConfig
|
|||
return hadoopDependenciesDir;
|
||||
}
|
||||
|
||||
public String getHadoopContainerDruidClasspath()
|
||||
{
|
||||
return hadoopContainerDruidClasspath;
|
||||
}
|
||||
|
||||
public List<String> getLoadList()
|
||||
{
|
||||
return loadList;
|
||||
|
@ -68,6 +76,7 @@ public class ExtensionsConfig
|
|||
"searchCurrentClassloader=" + searchCurrentClassloader +
|
||||
", directory='" + directory + '\'' +
|
||||
", hadoopDependenciesDir='" + hadoopDependenciesDir + '\'' +
|
||||
", hadoopContainerDruidClasspath='" + hadoopContainerDruidClasspath + '\'' +
|
||||
", loadList=" + loadList +
|
||||
'}';
|
||||
}
|
||||
|
|
|
@ -0,0 +1,82 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.guice;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import io.druid.segment.TestHelper;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ExtensionsConfigTest
|
||||
{
|
||||
@Test
|
||||
public void testSerdeWithDefaults() throws Exception
|
||||
{
|
||||
String json = "{}";
|
||||
ObjectMapper mapper = TestHelper.getObjectMapper();
|
||||
|
||||
ExtensionsConfig config = mapper.readValue(
|
||||
mapper.writeValueAsString(
|
||||
mapper.readValue(json, ExtensionsConfig.class)
|
||||
),
|
||||
ExtensionsConfig.class
|
||||
);
|
||||
|
||||
Assert.assertTrue(config.searchCurrentClassloader());
|
||||
Assert.assertEquals("extensions", config.getDirectory());
|
||||
Assert.assertEquals("hadoop-dependencies", config.getHadoopDependenciesDir());
|
||||
Assert.assertNull(config.getHadoopContainerDruidClasspath());
|
||||
Assert.assertNull(config.getLoadList());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSerdeWithNonDefaults() throws Exception
|
||||
{
|
||||
String json = "{\n"
|
||||
+ " \"searchCurrentClassloader\": false,\n"
|
||||
+ " \"directory\": \"testExtensions\",\n"
|
||||
+ " \"hadoopDependenciesDir\": \"testHadoopDependenciesDir\",\n"
|
||||
+ " \"hadoopContainerDruidClasspath\": \"testHadoopContainerClasspath\",\n"
|
||||
+ " \"loadList\": [\"a\",\"b\"]\n"
|
||||
+ "}";
|
||||
ObjectMapper mapper = TestHelper.getObjectMapper();
|
||||
|
||||
ExtensionsConfig config = mapper.readValue(
|
||||
mapper.writeValueAsString(
|
||||
mapper.readValue(json, ExtensionsConfig.class)
|
||||
),
|
||||
ExtensionsConfig.class
|
||||
);
|
||||
|
||||
Assert.assertFalse(config.searchCurrentClassloader());
|
||||
Assert.assertEquals("testExtensions", config.getDirectory());
|
||||
Assert.assertEquals("testHadoopDependenciesDir", config.getHadoopDependenciesDir());
|
||||
Assert.assertEquals("testHadoopContainerClasspath", config.getHadoopContainerDruidClasspath());
|
||||
Assert.assertEquals(
|
||||
ImmutableList.of(
|
||||
"a", "b"
|
||||
),
|
||||
config.getLoadList()
|
||||
);
|
||||
}
|
||||
}
|
|
@ -64,9 +64,12 @@ import org.apache.commons.io.FileUtils;
|
|||
import org.eclipse.aether.artifact.DefaultArtifact;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FilenameFilter;
|
||||
import java.io.IOException;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
import java.net.URLClassLoader;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
@ -275,6 +278,41 @@ public class Initialization
|
|||
return loader;
|
||||
}
|
||||
|
||||
public static List<URL> getURLsForClasspath(String cp)
|
||||
{
|
||||
try {
|
||||
String[] paths = cp.split(File.pathSeparator);
|
||||
|
||||
List<URL> urls = new ArrayList<>();
|
||||
for (int i = 0; i < paths.length; i++) {
|
||||
File f = new File(paths[i]);
|
||||
if ("*".equals(f.getName())) {
|
||||
File parentDir = f.getParentFile();
|
||||
if (parentDir.isDirectory()) {
|
||||
File[] jars = parentDir.listFiles(
|
||||
new FilenameFilter()
|
||||
{
|
||||
@Override
|
||||
public boolean accept(File dir, String name)
|
||||
{
|
||||
return name != null && (name.endsWith(".jar") || name.endsWith(".JAR"));
|
||||
}
|
||||
}
|
||||
);
|
||||
for (File jar : jars) {
|
||||
urls.add(jar.toURI().toURL());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
urls.add(new File(paths[i]).toURI().toURL());
|
||||
}
|
||||
}
|
||||
return urls;
|
||||
} catch (IOException ex) {
|
||||
throw Throwables.propagate(ex);
|
||||
}
|
||||
}
|
||||
|
||||
public static Injector makeInjectorWithModules(final Injector baseInjector, Iterable<? extends Module> modules)
|
||||
{
|
||||
final ModuleList defaultModules = new ModuleList(baseInjector);
|
||||
|
|
|
@ -387,6 +387,46 @@ public class InitializationTest
|
|||
Assert.assertArrayEquals(expectedFileList, actualFileList);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetURLsForClasspath() throws Exception
|
||||
{
|
||||
File tmpDir1 = temporaryFolder.newFolder();
|
||||
File tmpDir2 = temporaryFolder.newFolder();
|
||||
File tmpDir3 = temporaryFolder.newFolder();
|
||||
|
||||
File tmpDir1a = new File(tmpDir1, "a.jar");
|
||||
tmpDir1a.createNewFile();
|
||||
File tmpDir1b = new File(tmpDir1, "b.jar");
|
||||
tmpDir1b.createNewFile();
|
||||
new File(tmpDir1, "note1.txt").createNewFile();
|
||||
|
||||
File tmpDir2c = new File(tmpDir2, "c.jar");
|
||||
tmpDir2c.createNewFile();
|
||||
File tmpDir2d = new File(tmpDir2, "d.jar");
|
||||
tmpDir2d.createNewFile();
|
||||
File tmpDir2e = new File(tmpDir2, "e.JAR");
|
||||
tmpDir2e.createNewFile();
|
||||
new File(tmpDir2, "note2.txt").createNewFile();
|
||||
|
||||
String cp = tmpDir1.getAbsolutePath() + File.separator + "*"
|
||||
+ File.pathSeparator
|
||||
+ tmpDir3.getAbsolutePath()
|
||||
+ File.pathSeparator
|
||||
+ tmpDir2.getAbsolutePath() + File.separator + "*";
|
||||
|
||||
List<URL> expected = ImmutableList.<URL>builder()
|
||||
.add(tmpDir1a.toURI().toURL())
|
||||
.add(tmpDir1b.toURI().toURL())
|
||||
.add(tmpDir3.toURI().toURL())
|
||||
.add(tmpDir2c.toURI().toURL())
|
||||
.add(tmpDir2d.toURI().toURL())
|
||||
.add(tmpDir2e.toURI().toURL())
|
||||
.build();
|
||||
|
||||
|
||||
Assert.assertEquals(expected, Initialization.getURLsForClasspath(cp));
|
||||
}
|
||||
|
||||
public static class TestDruidModule implements DruidModule
|
||||
{
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue