diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml
index 8eacc7e2220..21d4abbd5a9 100644
--- a/indexing-hadoop/pom.xml
+++ b/indexing-hadoop/pom.xml
@@ -106,11 +106,6 @@
com.google.inject
guice
-
- org.apache.hadoop
- hadoop-hdfs-client
- provided
-
javax.validation
validation-api
@@ -146,13 +141,6 @@
hamcrest-all
test
-
- org.apache.hadoop
- hadoop-hdfs
- ${hadoop.compile.version}
- tests
- test
-
org.apache.hadoop
hadoop-common
@@ -160,12 +148,6 @@
tests
test
-
- org.apache.hadoop
- hadoop-hdfs
- ${hadoop.compile.version}
- test
-
org.apache.druid
druid-core
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HdfsClasspathSetupTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HdfsClasspathSetupTest.java
index f524a97ae59..292f683f9a4 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HdfsClasspathSetupTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HdfsClasspathSetupTest.java
@@ -29,9 +29,9 @@ import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.junit.After;
@@ -59,7 +59,7 @@ import java.util.concurrent.TimeoutException;
public class HdfsClasspathSetupTest
{
- private static MiniDFSCluster miniCluster;
+ private static FileSystem localFS;
private static File hdfsTmpDir;
private static Configuration conf;
private static String dummyJarString = "This is a test jar file.";
@@ -77,8 +77,8 @@ public class HdfsClasspathSetupTest
throw new IOE("Unable to delete hdfsTmpDir [%s]", hdfsTmpDir.getAbsolutePath());
}
conf = new Configuration(true);
- conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsTmpDir.getAbsolutePath());
- miniCluster = new MiniDFSCluster.Builder(conf).build();
+ localFS = new LocalFileSystem();
+ localFS.initialize(hdfsTmpDir.toURI(), conf);
}
@Before
@@ -98,9 +98,6 @@ public class HdfsClasspathSetupTest
@AfterClass
public static void tearDownStatic() throws IOException
{
- if (miniCluster != null) {
- miniCluster.shutdown(true);
- }
FileUtils.deleteDirectory(hdfsTmpDir);
}
@@ -109,41 +106,39 @@ public class HdfsClasspathSetupTest
{
dummyJarFile.delete();
Assert.assertFalse(dummyJarFile.exists());
- miniCluster.getFileSystem().delete(finalClasspath, true);
- Assert.assertFalse(miniCluster.getFileSystem().exists(finalClasspath));
- miniCluster.getFileSystem().delete(intermediatePath, true);
- Assert.assertFalse(miniCluster.getFileSystem().exists(intermediatePath));
+ localFS.delete(finalClasspath, true);
+ Assert.assertFalse(localFS.exists(finalClasspath));
+ localFS.delete(intermediatePath, true);
+ Assert.assertFalse(localFS.exists(intermediatePath));
}
@Test
public void testAddSnapshotJarToClasspath() throws IOException
{
Job job = Job.getInstance(conf, "test-job");
- DistributedFileSystem fs = miniCluster.getFileSystem();
Path intermediatePath = new Path("/tmp/classpath");
- JobHelper.addSnapshotJarToClassPath(dummyJarFile, intermediatePath, fs, job);
+ JobHelper.addSnapshotJarToClassPath(dummyJarFile, intermediatePath, localFS, job);
Path expectedJarPath = new Path(intermediatePath, dummyJarFile.getName());
// check file gets uploaded to HDFS
- Assert.assertTrue(fs.exists(expectedJarPath));
+ Assert.assertTrue(localFS.exists(expectedJarPath));
// check file gets added to the classpath
Assert.assertEquals(expectedJarPath.toString(), job.getConfiguration().get(MRJobConfig.CLASSPATH_FILES));
- Assert.assertEquals(dummyJarString, StringUtils.fromUtf8(IOUtils.toByteArray(fs.open(expectedJarPath))));
+ Assert.assertEquals(dummyJarString, StringUtils.fromUtf8(IOUtils.toByteArray(localFS.open(expectedJarPath))));
}
@Test
public void testAddNonSnapshotJarToClasspath() throws IOException
{
Job job = Job.getInstance(conf, "test-job");
- DistributedFileSystem fs = miniCluster.getFileSystem();
- JobHelper.addJarToClassPath(dummyJarFile, finalClasspath, intermediatePath, fs, job);
+ JobHelper.addJarToClassPath(dummyJarFile, finalClasspath, intermediatePath, localFS, job);
Path expectedJarPath = new Path(finalClasspath, dummyJarFile.getName());
// check file gets uploaded to final HDFS path
- Assert.assertTrue(fs.exists(expectedJarPath));
+ Assert.assertTrue(localFS.exists(expectedJarPath));
// check that the intermediate file gets deleted
- Assert.assertFalse(fs.exists(new Path(intermediatePath, dummyJarFile.getName())));
+ Assert.assertFalse(localFS.exists(new Path(intermediatePath, dummyJarFile.getName())));
// check file gets added to the classpath
Assert.assertEquals(expectedJarPath.toString(), job.getConfiguration().get(MRJobConfig.CLASSPATH_FILES));
- Assert.assertEquals(dummyJarString, StringUtils.fromUtf8(IOUtils.toByteArray(fs.open(expectedJarPath))));
+ Assert.assertEquals(dummyJarString, StringUtils.fromUtf8(IOUtils.toByteArray(localFS.open(expectedJarPath))));
}
@Test
@@ -159,13 +154,12 @@ public class HdfsClasspathSetupTest
}
@Test
- public void testConcurrentUpload() throws IOException, InterruptedException, ExecutionException, TimeoutException
+ public void testConcurrentUpload() throws InterruptedException, ExecutionException, TimeoutException
{
final int concurrency = 10;
ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(concurrency));
// barrier ensures that all jobs try to add files to classpath at same time.
final CyclicBarrier barrier = new CyclicBarrier(concurrency);
- final DistributedFileSystem fs = miniCluster.getFileSystem();
final Path expectedJarPath = new Path(finalClasspath, dummyJarFile.getName());
List> futures = new ArrayList<>();
@@ -180,11 +174,11 @@ public class HdfsClasspathSetupTest
int id = barrier.await();
Job job = Job.getInstance(conf, "test-job-" + id);
Path intermediatePathForJob = new Path(intermediatePath, "job-" + id);
- JobHelper.addJarToClassPath(dummyJarFile, finalClasspath, intermediatePathForJob, fs, job);
+ JobHelper.addJarToClassPath(dummyJarFile, finalClasspath, intermediatePathForJob, localFS, job);
// check file gets uploaded to final HDFS path
- Assert.assertTrue(fs.exists(expectedJarPath));
+ Assert.assertTrue(localFS.exists(expectedJarPath));
// check that the intermediate file is not present
- Assert.assertFalse(fs.exists(new Path(intermediatePathForJob, dummyJarFile.getName())));
+ Assert.assertFalse(localFS.exists(new Path(intermediatePathForJob, dummyJarFile.getName())));
// check file gets added to the classpath
Assert.assertEquals(
expectedJarPath.toString(),
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index a711d98e137..417df678fb7 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -160,11 +160,6 @@
javax.validation
validation-api
-
- org.apache.hadoop
- hadoop-yarn-common
- provided
-
javax.servlet
servlet-api
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java
index 97d6df15b32..f08e3f73aa2 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java
@@ -30,7 +30,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.JvmUtils;
-import org.apache.hadoop.yarn.util.ApplicationClassLoader;
+import org.apache.hadoop.util.ApplicationClassLoader;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.Assert;