From 1a562f444c4e349348f7d909720eb5f368ecb8ef Mon Sep 17 00:00:00 2001 From: Rohan Garg <7731512+rohangarg@users.noreply.github.com> Date: Wed, 4 Aug 2021 06:26:54 +0530 Subject: [PATCH] Cleanup hadoop dependencies in indexing modules (#11516) * Remove hadoop-yarn-common dependency (cherry picked from commit d767c8f3d204d9d27d8122d55680c3c9f1cfe473) * Remove hdfs dependency from druid core --- indexing-hadoop/pom.xml | 18 -------- .../druid/indexer/HdfsClasspathSetupTest.java | 46 ++++++++----------- indexing-service/pom.xml | 5 -- .../indexing/common/task/HadoopTaskTest.java | 2 +- 4 files changed, 21 insertions(+), 50 deletions(-) diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml index 8eacc7e2220..21d4abbd5a9 100644 --- a/indexing-hadoop/pom.xml +++ b/indexing-hadoop/pom.xml @@ -106,11 +106,6 @@ com.google.inject guice - - org.apache.hadoop - hadoop-hdfs-client - provided - javax.validation validation-api @@ -146,13 +141,6 @@ hamcrest-all test - - org.apache.hadoop - hadoop-hdfs - ${hadoop.compile.version} - tests - test - org.apache.hadoop hadoop-common @@ -160,12 +148,6 @@ tests test - - org.apache.hadoop - hadoop-hdfs - ${hadoop.compile.version} - test - org.apache.druid druid-core diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HdfsClasspathSetupTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HdfsClasspathSetupTest.java index f524a97ae59..292f683f9a4 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HdfsClasspathSetupTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HdfsClasspathSetupTest.java @@ -29,9 +29,9 @@ import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRJobConfig; import org.junit.After; @@ -59,7 +59,7 @@ import java.util.concurrent.TimeoutException; public class HdfsClasspathSetupTest { - private static MiniDFSCluster miniCluster; + private static FileSystem localFS; private static File hdfsTmpDir; private static Configuration conf; private static String dummyJarString = "This is a test jar file."; @@ -77,8 +77,8 @@ public class HdfsClasspathSetupTest throw new IOE("Unable to delete hdfsTmpDir [%s]", hdfsTmpDir.getAbsolutePath()); } conf = new Configuration(true); - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsTmpDir.getAbsolutePath()); - miniCluster = new MiniDFSCluster.Builder(conf).build(); + localFS = new LocalFileSystem(); + localFS.initialize(hdfsTmpDir.toURI(), conf); } @Before @@ -98,9 +98,6 @@ public class HdfsClasspathSetupTest @AfterClass public static void tearDownStatic() throws IOException { - if (miniCluster != null) { - miniCluster.shutdown(true); - } FileUtils.deleteDirectory(hdfsTmpDir); } @@ -109,41 +106,39 @@ public class HdfsClasspathSetupTest { dummyJarFile.delete(); Assert.assertFalse(dummyJarFile.exists()); - miniCluster.getFileSystem().delete(finalClasspath, true); - Assert.assertFalse(miniCluster.getFileSystem().exists(finalClasspath)); - miniCluster.getFileSystem().delete(intermediatePath, true); - Assert.assertFalse(miniCluster.getFileSystem().exists(intermediatePath)); + localFS.delete(finalClasspath, true); + Assert.assertFalse(localFS.exists(finalClasspath)); + localFS.delete(intermediatePath, true); + Assert.assertFalse(localFS.exists(intermediatePath)); } @Test public void testAddSnapshotJarToClasspath() throws IOException { Job job = Job.getInstance(conf, "test-job"); - DistributedFileSystem fs = miniCluster.getFileSystem(); Path intermediatePath = new Path("/tmp/classpath"); - JobHelper.addSnapshotJarToClassPath(dummyJarFile, intermediatePath, fs, job); + JobHelper.addSnapshotJarToClassPath(dummyJarFile, intermediatePath, localFS, job); Path expectedJarPath = new Path(intermediatePath, dummyJarFile.getName()); // check file gets uploaded to HDFS - Assert.assertTrue(fs.exists(expectedJarPath)); + Assert.assertTrue(localFS.exists(expectedJarPath)); // check file gets added to the classpath Assert.assertEquals(expectedJarPath.toString(), job.getConfiguration().get(MRJobConfig.CLASSPATH_FILES)); - Assert.assertEquals(dummyJarString, StringUtils.fromUtf8(IOUtils.toByteArray(fs.open(expectedJarPath)))); + Assert.assertEquals(dummyJarString, StringUtils.fromUtf8(IOUtils.toByteArray(localFS.open(expectedJarPath)))); } @Test public void testAddNonSnapshotJarToClasspath() throws IOException { Job job = Job.getInstance(conf, "test-job"); - DistributedFileSystem fs = miniCluster.getFileSystem(); - JobHelper.addJarToClassPath(dummyJarFile, finalClasspath, intermediatePath, fs, job); + JobHelper.addJarToClassPath(dummyJarFile, finalClasspath, intermediatePath, localFS, job); Path expectedJarPath = new Path(finalClasspath, dummyJarFile.getName()); // check file gets uploaded to final HDFS path - Assert.assertTrue(fs.exists(expectedJarPath)); + Assert.assertTrue(localFS.exists(expectedJarPath)); // check that the intermediate file gets deleted - Assert.assertFalse(fs.exists(new Path(intermediatePath, dummyJarFile.getName()))); + Assert.assertFalse(localFS.exists(new Path(intermediatePath, dummyJarFile.getName()))); // check file gets added to the classpath Assert.assertEquals(expectedJarPath.toString(), job.getConfiguration().get(MRJobConfig.CLASSPATH_FILES)); - Assert.assertEquals(dummyJarString, StringUtils.fromUtf8(IOUtils.toByteArray(fs.open(expectedJarPath)))); + Assert.assertEquals(dummyJarString, StringUtils.fromUtf8(IOUtils.toByteArray(localFS.open(expectedJarPath)))); } @Test @@ -159,13 +154,12 @@ public class HdfsClasspathSetupTest } @Test - public void testConcurrentUpload() throws IOException, InterruptedException, ExecutionException, TimeoutException + public void testConcurrentUpload() throws InterruptedException, ExecutionException, TimeoutException { final int concurrency = 10; ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(concurrency)); // barrier ensures that all jobs try to add files to classpath at same time. final CyclicBarrier barrier = new CyclicBarrier(concurrency); - final DistributedFileSystem fs = miniCluster.getFileSystem(); final Path expectedJarPath = new Path(finalClasspath, dummyJarFile.getName()); List> futures = new ArrayList<>(); @@ -180,11 +174,11 @@ public class HdfsClasspathSetupTest int id = barrier.await(); Job job = Job.getInstance(conf, "test-job-" + id); Path intermediatePathForJob = new Path(intermediatePath, "job-" + id); - JobHelper.addJarToClassPath(dummyJarFile, finalClasspath, intermediatePathForJob, fs, job); + JobHelper.addJarToClassPath(dummyJarFile, finalClasspath, intermediatePathForJob, localFS, job); // check file gets uploaded to final HDFS path - Assert.assertTrue(fs.exists(expectedJarPath)); + Assert.assertTrue(localFS.exists(expectedJarPath)); // check that the intermediate file is not present - Assert.assertFalse(fs.exists(new Path(intermediatePathForJob, dummyJarFile.getName()))); + Assert.assertFalse(localFS.exists(new Path(intermediatePathForJob, dummyJarFile.getName()))); // check file gets added to the classpath Assert.assertEquals( expectedJarPath.toString(), diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml index a711d98e137..417df678fb7 100644 --- a/indexing-service/pom.xml +++ b/indexing-service/pom.xml @@ -160,11 +160,6 @@ javax.validation validation-api - - org.apache.hadoop - hadoop-yarn-common - provided - javax.servlet servlet-api diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java index 97d6df15b32..f08e3f73aa2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java @@ -30,7 +30,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.timeline.DataSegment; import org.apache.druid.utils.JvmUtils; -import org.apache.hadoop.yarn.util.ApplicationClassLoader; +import org.apache.hadoop.util.ApplicationClassLoader; import org.easymock.EasyMock; import org.joda.time.Interval; import org.junit.Assert;