mirror of https://github.com/apache/druid.git
Cleanup hadoop dependencies in indexing modules (#11516)
* Remove hadoop-yarn-common dependency (cherry picked from commit d767c8f3d204d9d27d8122d55680c3c9f1cfe473) * Remove hdfs dependency from druid core
This commit is contained in:
parent
eceacf74c0
commit
1a562f444c
|
@ -106,11 +106,6 @@
|
||||||
<groupId>com.google.inject</groupId>
|
<groupId>com.google.inject</groupId>
|
||||||
<artifactId>guice</artifactId>
|
<artifactId>guice</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.hadoop</groupId>
|
|
||||||
<artifactId>hadoop-hdfs-client</artifactId>
|
|
||||||
<scope>provided</scope>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>javax.validation</groupId>
|
<groupId>javax.validation</groupId>
|
||||||
<artifactId>validation-api</artifactId>
|
<artifactId>validation-api</artifactId>
|
||||||
|
@ -146,13 +141,6 @@
|
||||||
<artifactId>hamcrest-all</artifactId>
|
<artifactId>hamcrest-all</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.hadoop</groupId>
|
|
||||||
<artifactId>hadoop-hdfs</artifactId>
|
|
||||||
<version>${hadoop.compile.version}</version>
|
|
||||||
<classifier>tests</classifier>
|
|
||||||
<scope>test</scope>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.hadoop</groupId>
|
<groupId>org.apache.hadoop</groupId>
|
||||||
<artifactId>hadoop-common</artifactId>
|
<artifactId>hadoop-common</artifactId>
|
||||||
|
@ -160,12 +148,6 @@
|
||||||
<classifier>tests</classifier>
|
<classifier>tests</classifier>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.hadoop</groupId>
|
|
||||||
<artifactId>hadoop-hdfs</artifactId>
|
|
||||||
<version>${hadoop.compile.version}</version>
|
|
||||||
<scope>test</scope>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.druid</groupId>
|
<groupId>org.apache.druid</groupId>
|
||||||
<artifactId>druid-core</artifactId>
|
<artifactId>druid-core</artifactId>
|
||||||
|
|
|
@ -29,9 +29,9 @@ import org.apache.druid.java.util.common.FileUtils;
|
||||||
import org.apache.druid.java.util.common.IOE;
|
import org.apache.druid.java.util.common.IOE;
|
||||||
import org.apache.druid.java.util.common.StringUtils;
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.LocalFileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -59,7 +59,7 @@ import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
public class HdfsClasspathSetupTest
|
public class HdfsClasspathSetupTest
|
||||||
{
|
{
|
||||||
private static MiniDFSCluster miniCluster;
|
private static FileSystem localFS;
|
||||||
private static File hdfsTmpDir;
|
private static File hdfsTmpDir;
|
||||||
private static Configuration conf;
|
private static Configuration conf;
|
||||||
private static String dummyJarString = "This is a test jar file.";
|
private static String dummyJarString = "This is a test jar file.";
|
||||||
|
@ -77,8 +77,8 @@ public class HdfsClasspathSetupTest
|
||||||
throw new IOE("Unable to delete hdfsTmpDir [%s]", hdfsTmpDir.getAbsolutePath());
|
throw new IOE("Unable to delete hdfsTmpDir [%s]", hdfsTmpDir.getAbsolutePath());
|
||||||
}
|
}
|
||||||
conf = new Configuration(true);
|
conf = new Configuration(true);
|
||||||
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsTmpDir.getAbsolutePath());
|
localFS = new LocalFileSystem();
|
||||||
miniCluster = new MiniDFSCluster.Builder(conf).build();
|
localFS.initialize(hdfsTmpDir.toURI(), conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
@ -98,9 +98,6 @@ public class HdfsClasspathSetupTest
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void tearDownStatic() throws IOException
|
public static void tearDownStatic() throws IOException
|
||||||
{
|
{
|
||||||
if (miniCluster != null) {
|
|
||||||
miniCluster.shutdown(true);
|
|
||||||
}
|
|
||||||
FileUtils.deleteDirectory(hdfsTmpDir);
|
FileUtils.deleteDirectory(hdfsTmpDir);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,41 +106,39 @@ public class HdfsClasspathSetupTest
|
||||||
{
|
{
|
||||||
dummyJarFile.delete();
|
dummyJarFile.delete();
|
||||||
Assert.assertFalse(dummyJarFile.exists());
|
Assert.assertFalse(dummyJarFile.exists());
|
||||||
miniCluster.getFileSystem().delete(finalClasspath, true);
|
localFS.delete(finalClasspath, true);
|
||||||
Assert.assertFalse(miniCluster.getFileSystem().exists(finalClasspath));
|
Assert.assertFalse(localFS.exists(finalClasspath));
|
||||||
miniCluster.getFileSystem().delete(intermediatePath, true);
|
localFS.delete(intermediatePath, true);
|
||||||
Assert.assertFalse(miniCluster.getFileSystem().exists(intermediatePath));
|
Assert.assertFalse(localFS.exists(intermediatePath));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAddSnapshotJarToClasspath() throws IOException
|
public void testAddSnapshotJarToClasspath() throws IOException
|
||||||
{
|
{
|
||||||
Job job = Job.getInstance(conf, "test-job");
|
Job job = Job.getInstance(conf, "test-job");
|
||||||
DistributedFileSystem fs = miniCluster.getFileSystem();
|
|
||||||
Path intermediatePath = new Path("/tmp/classpath");
|
Path intermediatePath = new Path("/tmp/classpath");
|
||||||
JobHelper.addSnapshotJarToClassPath(dummyJarFile, intermediatePath, fs, job);
|
JobHelper.addSnapshotJarToClassPath(dummyJarFile, intermediatePath, localFS, job);
|
||||||
Path expectedJarPath = new Path(intermediatePath, dummyJarFile.getName());
|
Path expectedJarPath = new Path(intermediatePath, dummyJarFile.getName());
|
||||||
// check file gets uploaded to HDFS
|
// check file gets uploaded to HDFS
|
||||||
Assert.assertTrue(fs.exists(expectedJarPath));
|
Assert.assertTrue(localFS.exists(expectedJarPath));
|
||||||
// check file gets added to the classpath
|
// check file gets added to the classpath
|
||||||
Assert.assertEquals(expectedJarPath.toString(), job.getConfiguration().get(MRJobConfig.CLASSPATH_FILES));
|
Assert.assertEquals(expectedJarPath.toString(), job.getConfiguration().get(MRJobConfig.CLASSPATH_FILES));
|
||||||
Assert.assertEquals(dummyJarString, StringUtils.fromUtf8(IOUtils.toByteArray(fs.open(expectedJarPath))));
|
Assert.assertEquals(dummyJarString, StringUtils.fromUtf8(IOUtils.toByteArray(localFS.open(expectedJarPath))));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAddNonSnapshotJarToClasspath() throws IOException
|
public void testAddNonSnapshotJarToClasspath() throws IOException
|
||||||
{
|
{
|
||||||
Job job = Job.getInstance(conf, "test-job");
|
Job job = Job.getInstance(conf, "test-job");
|
||||||
DistributedFileSystem fs = miniCluster.getFileSystem();
|
JobHelper.addJarToClassPath(dummyJarFile, finalClasspath, intermediatePath, localFS, job);
|
||||||
JobHelper.addJarToClassPath(dummyJarFile, finalClasspath, intermediatePath, fs, job);
|
|
||||||
Path expectedJarPath = new Path(finalClasspath, dummyJarFile.getName());
|
Path expectedJarPath = new Path(finalClasspath, dummyJarFile.getName());
|
||||||
// check file gets uploaded to final HDFS path
|
// check file gets uploaded to final HDFS path
|
||||||
Assert.assertTrue(fs.exists(expectedJarPath));
|
Assert.assertTrue(localFS.exists(expectedJarPath));
|
||||||
// check that the intermediate file gets deleted
|
// check that the intermediate file gets deleted
|
||||||
Assert.assertFalse(fs.exists(new Path(intermediatePath, dummyJarFile.getName())));
|
Assert.assertFalse(localFS.exists(new Path(intermediatePath, dummyJarFile.getName())));
|
||||||
// check file gets added to the classpath
|
// check file gets added to the classpath
|
||||||
Assert.assertEquals(expectedJarPath.toString(), job.getConfiguration().get(MRJobConfig.CLASSPATH_FILES));
|
Assert.assertEquals(expectedJarPath.toString(), job.getConfiguration().get(MRJobConfig.CLASSPATH_FILES));
|
||||||
Assert.assertEquals(dummyJarString, StringUtils.fromUtf8(IOUtils.toByteArray(fs.open(expectedJarPath))));
|
Assert.assertEquals(dummyJarString, StringUtils.fromUtf8(IOUtils.toByteArray(localFS.open(expectedJarPath))));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -159,13 +154,12 @@ public class HdfsClasspathSetupTest
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConcurrentUpload() throws IOException, InterruptedException, ExecutionException, TimeoutException
|
public void testConcurrentUpload() throws InterruptedException, ExecutionException, TimeoutException
|
||||||
{
|
{
|
||||||
final int concurrency = 10;
|
final int concurrency = 10;
|
||||||
ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(concurrency));
|
ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(concurrency));
|
||||||
// barrier ensures that all jobs try to add files to classpath at same time.
|
// barrier ensures that all jobs try to add files to classpath at same time.
|
||||||
final CyclicBarrier barrier = new CyclicBarrier(concurrency);
|
final CyclicBarrier barrier = new CyclicBarrier(concurrency);
|
||||||
final DistributedFileSystem fs = miniCluster.getFileSystem();
|
|
||||||
final Path expectedJarPath = new Path(finalClasspath, dummyJarFile.getName());
|
final Path expectedJarPath = new Path(finalClasspath, dummyJarFile.getName());
|
||||||
List<ListenableFuture<Boolean>> futures = new ArrayList<>();
|
List<ListenableFuture<Boolean>> futures = new ArrayList<>();
|
||||||
|
|
||||||
|
@ -180,11 +174,11 @@ public class HdfsClasspathSetupTest
|
||||||
int id = barrier.await();
|
int id = barrier.await();
|
||||||
Job job = Job.getInstance(conf, "test-job-" + id);
|
Job job = Job.getInstance(conf, "test-job-" + id);
|
||||||
Path intermediatePathForJob = new Path(intermediatePath, "job-" + id);
|
Path intermediatePathForJob = new Path(intermediatePath, "job-" + id);
|
||||||
JobHelper.addJarToClassPath(dummyJarFile, finalClasspath, intermediatePathForJob, fs, job);
|
JobHelper.addJarToClassPath(dummyJarFile, finalClasspath, intermediatePathForJob, localFS, job);
|
||||||
// check file gets uploaded to final HDFS path
|
// check file gets uploaded to final HDFS path
|
||||||
Assert.assertTrue(fs.exists(expectedJarPath));
|
Assert.assertTrue(localFS.exists(expectedJarPath));
|
||||||
// check that the intermediate file is not present
|
// check that the intermediate file is not present
|
||||||
Assert.assertFalse(fs.exists(new Path(intermediatePathForJob, dummyJarFile.getName())));
|
Assert.assertFalse(localFS.exists(new Path(intermediatePathForJob, dummyJarFile.getName())));
|
||||||
// check file gets added to the classpath
|
// check file gets added to the classpath
|
||||||
Assert.assertEquals(
|
Assert.assertEquals(
|
||||||
expectedJarPath.toString(),
|
expectedJarPath.toString(),
|
||||||
|
|
|
@ -160,11 +160,6 @@
|
||||||
<groupId>javax.validation</groupId>
|
<groupId>javax.validation</groupId>
|
||||||
<artifactId>validation-api</artifactId>
|
<artifactId>validation-api</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.hadoop</groupId>
|
|
||||||
<artifactId>hadoop-yarn-common</artifactId>
|
|
||||||
<scope>provided</scope>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>javax.servlet</groupId>
|
<groupId>javax.servlet</groupId>
|
||||||
<artifactId>servlet-api</artifactId>
|
<artifactId>servlet-api</artifactId>
|
||||||
|
|
|
@ -30,7 +30,7 @@ import org.apache.druid.java.util.common.StringUtils;
|
||||||
import org.apache.druid.java.util.common.granularity.Granularity;
|
import org.apache.druid.java.util.common.granularity.Granularity;
|
||||||
import org.apache.druid.timeline.DataSegment;
|
import org.apache.druid.timeline.DataSegment;
|
||||||
import org.apache.druid.utils.JvmUtils;
|
import org.apache.druid.utils.JvmUtils;
|
||||||
import org.apache.hadoop.yarn.util.ApplicationClassLoader;
|
import org.apache.hadoop.util.ApplicationClassLoader;
|
||||||
import org.easymock.EasyMock;
|
import org.easymock.EasyMock;
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
|
Loading…
Reference in New Issue