diff --git a/extensions-core/hdfs-storage/pom.xml b/extensions-core/hdfs-storage/pom.xml index 8bd9c251b0e..d16712d8eae 100644 --- a/extensions-core/hdfs-storage/pom.xml +++ b/extensions-core/hdfs-storage/pom.xml @@ -372,13 +372,6 @@ - - org.apache.hadoop - hadoop-common - ${hadoop.compile.version} - tests - test - junit junit @@ -397,19 +390,6 @@ test-jar test - - org.apache.hadoop - hadoop-hdfs - ${hadoop.compile.version} - tests - test - - - org.apache.hadoop - hadoop-hdfs - ${hadoop.compile.version} - test - org.apache.druid druid-indexing-hadoop diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java index 96a99ad6fdf..4322e199f71 100644 --- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java @@ -39,8 +39,9 @@ import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.storage.hdfs.HdfsStorageDruidModule; import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -246,7 +247,7 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest public static class ReaderTest { - private static final String PATH = "/test"; + private static final String PATH = "test"; private static final int NUM_FILE = 3; private static final String KEY_VALUE_SEPARATOR = ","; private static final String ALPHABET = "abcdefghijklmnopqrstuvwxyz"; @@ -254,7 +255,7 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); - private MiniDFSCluster dfsCluster; + private FileSystem fileSystem; private HdfsInputSource target; private Set paths; private Map timestampToValue; @@ -266,8 +267,9 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest File dir = temporaryFolder.getRoot(); Configuration configuration = new Configuration(true); - configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dir.getAbsolutePath()); - dfsCluster = new MiniDFSCluster.Builder(configuration).build(); + fileSystem = new LocalFileSystem(); + fileSystem.initialize(dir.toURI(), configuration); + fileSystem.setWorkingDirectory(new Path(dir.getAbsolutePath())); paths = IntStream.range(0, NUM_FILE) .mapToObj( @@ -275,7 +277,7 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest char value = ALPHABET.charAt(i % ALPHABET.length()); timestampToValue.put((long) i, Character.toString(value)); return createFile( - dfsCluster, + fileSystem, String.valueOf(i), i + KEY_VALUE_SEPARATOR + value ); @@ -284,30 +286,29 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest .collect(Collectors.toSet()); target = HdfsInputSource.builder() - .paths(dfsCluster.getURI() + PATH + "*") + .paths(fileSystem.makeQualified(new Path(PATH)) + "*") .configuration(CONFIGURATION) - .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG) + .inputSourceConfig(new HdfsInputSourceConfig(ImmutableSet.of("hdfs", "file"))) .build(); } @After - public void teardown() + public void teardown() throws IOException { - if (dfsCluster != null) { - dfsCluster.shutdown(true); - } + temporaryFolder.delete(); + fileSystem.close(); } - private static Path createFile(MiniDFSCluster dfsCluster, String pathSuffix, String contents) + private static Path createFile(FileSystem fs, String pathSuffix, String contents) { try { Path path = new Path(PATH + pathSuffix); try (Writer writer = new BufferedWriter( - new OutputStreamWriter(dfsCluster.getFileSystem().create(path), StandardCharsets.UTF_8) + new OutputStreamWriter(fs.create(path), StandardCharsets.UTF_8) )) { writer.write(contents); } - return path; + return fs.makeQualified(path); } catch (IOException e) { throw new UncheckedIOException(e); @@ -339,7 +340,6 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest splits.forEach(split -> Assert.assertEquals(1, split.get().size())); Set actualPaths = splits.stream() .flatMap(split -> split.get().stream()) - .map(Path::getPathWithoutSchemeAndAuthority) .collect(Collectors.toSet()); Assert.assertEquals(paths, actualPaths); } diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/segment/loading/HdfsFileTimestampVersionFinderTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/segment/loading/HdfsFileTimestampVersionFinderTest.java index 1ccf8e63c88..b6f0add996f 100644 --- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/segment/loading/HdfsFileTimestampVersionFinderTest.java +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/segment/loading/HdfsFileTimestampVersionFinderTest.java @@ -25,8 +25,9 @@ import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.storage.hdfs.HdfsFileTimestampVersionFinder; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -45,10 +46,10 @@ import java.util.regex.Pattern; public class HdfsFileTimestampVersionFinderTest { - private static MiniDFSCluster miniCluster; + private static FileSystem fileSystem; private static File hdfsTmpDir; - private static Path filePath = new Path("/tmp/foo"); - private static Path perTestPath = new Path("/tmp/tmp2"); + private static Path filePath = new Path("tmp1/foo"); + private static Path perTestPath = new Path("tmp1/tmp2"); private static String pathContents = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum"; private static byte[] pathByteContents = StringUtils.toUtf8(pathContents); private static Configuration conf; @@ -61,14 +62,15 @@ public class HdfsFileTimestampVersionFinderTest throw new IOE("Unable to delete hdfsTmpDir [%s]", hdfsTmpDir.getAbsolutePath()); } conf = new Configuration(true); - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsTmpDir.getAbsolutePath()); - miniCluster = new MiniDFSCluster.Builder(conf).build(); + fileSystem = new LocalFileSystem(); + fileSystem.initialize(hdfsTmpDir.toURI(), conf); + fileSystem.setWorkingDirectory(new Path(hdfsTmpDir.toURI())); final File tmpFile = File.createTempFile("hdfsHandlerTest", ".data"); tmpFile.delete(); try { Files.copy(new ByteArrayInputStream(pathByteContents), tmpFile.toPath()); - try (OutputStream stream = miniCluster.getFileSystem().create(filePath)) { + try (OutputStream stream = fileSystem.create(filePath)) { Files.copy(tmpFile.toPath(), stream); } } @@ -80,10 +82,8 @@ public class HdfsFileTimestampVersionFinderTest @AfterClass public static void tearDownStatic() throws IOException { - if (miniCluster != null) { - miniCluster.shutdown(true); - } FileUtils.deleteDirectory(hdfsTmpDir); + fileSystem.close(); } @@ -98,7 +98,7 @@ public class HdfsFileTimestampVersionFinderTest @After public void tearDown() throws IOException { - miniCluster.getFileSystem().delete(perTestPath, true); + fileSystem.delete(perTestPath, true); } @@ -106,8 +106,8 @@ public class HdfsFileTimestampVersionFinderTest public void testSimpleLatestVersion() throws IOException, InterruptedException { final Path oldPath = new Path(perTestPath, "555test.txt"); - Assert.assertFalse(miniCluster.getFileSystem().exists(oldPath)); - try (final OutputStream outputStream = miniCluster.getFileSystem().create(oldPath); + Assert.assertFalse(fileSystem.exists(oldPath)); + try (final OutputStream outputStream = fileSystem.create(oldPath); final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) { ByteStreams.copy(inputStream, outputStream); } @@ -115,21 +115,23 @@ public class HdfsFileTimestampVersionFinderTest Thread.sleep(10); final Path newPath = new Path(perTestPath, "666test.txt"); - Assert.assertFalse(miniCluster.getFileSystem().exists(newPath)); - try (final OutputStream outputStream = miniCluster.getFileSystem().create(newPath); + Assert.assertFalse(fileSystem.exists(newPath)); + try (final OutputStream outputStream = fileSystem.create(newPath); final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) { ByteStreams.copy(inputStream, outputStream); } - Assert.assertEquals(newPath.toString(), finder.getLatestVersion(oldPath.toUri(), Pattern.compile(".*")).getPath()); + Assert.assertEquals( + fileSystem.makeQualified(newPath).toUri(), + finder.getLatestVersion(fileSystem.makeQualified(oldPath).toUri(), Pattern.compile(".*"))); } @Test public void testAlreadyLatestVersion() throws IOException, InterruptedException { final Path oldPath = new Path(perTestPath, "555test.txt"); - Assert.assertFalse(miniCluster.getFileSystem().exists(oldPath)); - try (final OutputStream outputStream = miniCluster.getFileSystem().create(oldPath); + Assert.assertFalse(fileSystem.exists(oldPath)); + try (final OutputStream outputStream = fileSystem.create(oldPath); final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) { ByteStreams.copy(inputStream, outputStream); } @@ -137,29 +139,31 @@ public class HdfsFileTimestampVersionFinderTest Thread.sleep(10); final Path newPath = new Path(perTestPath, "666test.txt"); - Assert.assertFalse(miniCluster.getFileSystem().exists(newPath)); - try (final OutputStream outputStream = miniCluster.getFileSystem().create(newPath); + Assert.assertFalse(fileSystem.exists(newPath)); + try (final OutputStream outputStream = fileSystem.create(newPath); final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) { ByteStreams.copy(inputStream, outputStream); } - Assert.assertEquals(newPath.toString(), finder.getLatestVersion(newPath.toUri(), Pattern.compile(".*")).getPath()); + Assert.assertEquals( + fileSystem.makeQualified(newPath).toUri(), + finder.getLatestVersion(fileSystem.makeQualified(newPath).toUri(), Pattern.compile(".*"))); } @Test public void testNoLatestVersion() throws IOException { final Path oldPath = new Path(perTestPath, "555test.txt"); - Assert.assertFalse(miniCluster.getFileSystem().exists(oldPath)); - Assert.assertNull(finder.getLatestVersion(oldPath.toUri(), Pattern.compile(".*"))); + Assert.assertFalse(fileSystem.exists(oldPath)); + Assert.assertNull(finder.getLatestVersion(fileSystem.makeQualified(oldPath).toUri(), Pattern.compile(".*"))); } @Test public void testSimpleLatestVersionInDir() throws IOException, InterruptedException { final Path oldPath = new Path(perTestPath, "555test.txt"); - Assert.assertFalse(miniCluster.getFileSystem().exists(oldPath)); - try (final OutputStream outputStream = miniCluster.getFileSystem().create(oldPath); + Assert.assertFalse(fileSystem.exists(oldPath)); + try (final OutputStream outputStream = fileSystem.create(oldPath); final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) { ByteStreams.copy(inputStream, outputStream); } @@ -167,24 +171,23 @@ public class HdfsFileTimestampVersionFinderTest Thread.sleep(10); final Path newPath = new Path(perTestPath, "666test.txt"); - Assert.assertFalse(miniCluster.getFileSystem().exists(newPath)); - try (final OutputStream outputStream = miniCluster.getFileSystem().create(newPath); + Assert.assertFalse(fileSystem.exists(newPath)); + try (final OutputStream outputStream = fileSystem.create(newPath); final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) { ByteStreams.copy(inputStream, outputStream); } Assert.assertEquals( - newPath.toString(), - finder.getLatestVersion(perTestPath.toUri(), Pattern.compile(".*test\\.txt")).getPath() - ); + fileSystem.makeQualified(newPath).toUri(), + finder.getLatestVersion(fileSystem.makeQualified(perTestPath).toUri(), Pattern.compile(".*test\\.txt"))); } @Test public void testSkipMismatch() throws IOException, InterruptedException { final Path oldPath = new Path(perTestPath, "555test.txt"); - Assert.assertFalse(miniCluster.getFileSystem().exists(oldPath)); - try (final OutputStream outputStream = miniCluster.getFileSystem().create(oldPath); + Assert.assertFalse(fileSystem.exists(oldPath)); + try (final OutputStream outputStream = fileSystem.create(oldPath); final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) { ByteStreams.copy(inputStream, outputStream); } @@ -192,15 +195,14 @@ public class HdfsFileTimestampVersionFinderTest Thread.sleep(10); final Path newPath = new Path(perTestPath, "666test.txt2"); - Assert.assertFalse(miniCluster.getFileSystem().exists(newPath)); - try (final OutputStream outputStream = miniCluster.getFileSystem().create(newPath); + Assert.assertFalse(fileSystem.exists(newPath)); + try (final OutputStream outputStream = fileSystem.create(newPath); final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) { ByteStreams.copy(inputStream, outputStream); } Assert.assertEquals( - oldPath.toString(), - finder.getLatestVersion(perTestPath.toUri(), Pattern.compile(".*test\\.txt")).getPath() - ); + fileSystem.makeQualified(oldPath).toUri(), + finder.getLatestVersion(fileSystem.makeQualified(perTestPath).toUri(), Pattern.compile(".*test\\.txt"))); } } diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPullerTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPullerTest.java index ad14399767f..86328c9b0f2 100644 --- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPullerTest.java +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPullerTest.java @@ -26,8 +26,9 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.utils.CompressionUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -41,7 +42,6 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.net.URI; import java.nio.file.Files; import java.util.zip.GZIPOutputStream; @@ -50,11 +50,10 @@ import java.util.zip.GZIPOutputStream; */ public class HdfsDataSegmentPullerTest { - private static MiniDFSCluster miniCluster; + private static FileSystem fileSystem; private static File hdfsTmpDir; - private static URI uriBase; - private static Path filePath = new Path("/tmp/foo"); - private static Path perTestPath = new Path("/tmp/tmp2"); + private static Path filePath = new Path("tmp/foo"); + private static Path perTestPath = new Path("tmp/tmp2"); private static String pathContents = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum"; private static byte[] pathByteContents = StringUtils.toUtf8(pathContents); private static Configuration conf; @@ -67,15 +66,15 @@ public class HdfsDataSegmentPullerTest throw new IOE("Unable to delete hdfsTmpDir [%s]", hdfsTmpDir.getAbsolutePath()); } conf = new Configuration(true); - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsTmpDir.getAbsolutePath()); - miniCluster = new MiniDFSCluster.Builder(conf).build(); - uriBase = miniCluster.getURI(0); + fileSystem = new LocalFileSystem(); + fileSystem.initialize(hdfsTmpDir.toURI(), conf); + fileSystem.setWorkingDirectory(new Path(hdfsTmpDir.toURI())); final File tmpFile = File.createTempFile("hdfsHandlerTest", ".data"); tmpFile.delete(); try { Files.copy(new ByteArrayInputStream(pathByteContents), tmpFile.toPath()); - try (OutputStream stream = miniCluster.getFileSystem().create(filePath)) { + try (OutputStream stream = fileSystem.create(filePath)) { Files.copy(tmpFile.toPath(), stream); } } @@ -87,10 +86,8 @@ public class HdfsDataSegmentPullerTest @AfterClass public static void tearDownStatic() throws IOException { - if (miniCluster != null) { - miniCluster.shutdown(true); - } FileUtils.deleteDirectory(hdfsTmpDir); + fileSystem.close(); } @@ -105,7 +102,7 @@ public class HdfsDataSegmentPullerTest @After public void tearDown() throws IOException { - miniCluster.getFileSystem().delete(perTestPath, true); + fileSystem.delete(perTestPath, true); } @Test @@ -118,8 +115,6 @@ public class HdfsDataSegmentPullerTest final File outTmpDir = FileUtils.createTempDir(); - final URI uri = URI.create(uriBase.toString() + zipPath); - try (final OutputStream stream = new FileOutputStream(tmpFile)) { ByteStreams.copy(new ByteArrayInputStream(pathByteContents), stream); } @@ -128,12 +123,12 @@ public class HdfsDataSegmentPullerTest final File outFile = new File(outTmpDir, tmpFile.getName()); outFile.delete(); - try (final OutputStream stream = miniCluster.getFileSystem().create(zipPath)) { + try (final OutputStream stream = fileSystem.create(zipPath)) { CompressionUtils.zip(tmpDir, stream); } try { Assert.assertFalse(outFile.exists()); - puller.getSegmentFiles(new Path(uri), outTmpDir); + puller.getSegmentFiles(fileSystem.makeQualified(zipPath), outTmpDir); Assert.assertTrue(outFile.exists()); Assert.assertArrayEquals(pathByteContents, Files.readAllBytes(outFile.toPath())); @@ -163,16 +158,14 @@ public class HdfsDataSegmentPullerTest final File outFile = new File(outTmpDir, "testZip"); outFile.delete(); - final URI uri = URI.create(uriBase.toString() + zipPath); - - try (final OutputStream outputStream = miniCluster.getFileSystem().create(zipPath); + try (final OutputStream outputStream = fileSystem.create(zipPath); final OutputStream gzStream = new GZIPOutputStream(outputStream); final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) { ByteStreams.copy(inputStream, gzStream); } try { Assert.assertFalse(outFile.exists()); - puller.getSegmentFiles(new Path(uri), outTmpDir); + puller.getSegmentFiles(fileSystem.makeQualified(zipPath), outTmpDir); Assert.assertTrue(outFile.exists()); Assert.assertArrayEquals(pathByteContents, Files.readAllBytes(outFile.toPath())); @@ -197,15 +190,13 @@ public class HdfsDataSegmentPullerTest final File outFile = new File(outTmpDir, "test.txt"); outFile.delete(); - final URI uri = URI.create(uriBase.toString() + perTestPath); - - try (final OutputStream outputStream = miniCluster.getFileSystem().create(zipPath); + try (final OutputStream outputStream = fileSystem.create(zipPath); final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) { ByteStreams.copy(inputStream, outputStream); } try { Assert.assertFalse(outFile.exists()); - puller.getSegmentFiles(new Path(uri), outTmpDir); + puller.getSegmentFiles(fileSystem.makeQualified(perTestPath), outTmpDir); Assert.assertTrue(outFile.exists()); Assert.assertArrayEquals(pathByteContents, Files.readAllBytes(outFile.toPath())); diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HdfsClasspathSetupTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HdfsClasspathSetupTest.java index 292f683f9a4..d0a23a8da3c 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HdfsClasspathSetupTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HdfsClasspathSetupTest.java @@ -79,14 +79,15 @@ public class HdfsClasspathSetupTest conf = new Configuration(true); localFS = new LocalFileSystem(); localFS.initialize(hdfsTmpDir.toURI(), conf); + localFS.setWorkingDirectory(new Path(hdfsTmpDir.toURI())); } @Before public void setUp() throws IOException { // intermedatePath and finalClasspath are relative to hdfsTmpDir directory. - intermediatePath = new Path(StringUtils.format("/tmp/classpath/%s", UUIDUtils.generateUuid())); - finalClasspath = new Path(StringUtils.format("/tmp/intermediate/%s", UUIDUtils.generateUuid())); + intermediatePath = new Path(StringUtils.format("tmp/classpath/%s", UUIDUtils.generateUuid())); + finalClasspath = new Path(StringUtils.format("tmp/intermediate/%s", UUIDUtils.generateUuid())); dummyJarFile = tempFolder.newFile("dummy-test.jar"); Files.copy( new ByteArrayInputStream(StringUtils.toUtf8(dummyJarString)), @@ -116,7 +117,7 @@ public class HdfsClasspathSetupTest public void testAddSnapshotJarToClasspath() throws IOException { Job job = Job.getInstance(conf, "test-job"); - Path intermediatePath = new Path("/tmp/classpath"); + Path intermediatePath = new Path("tmp/classpath"); JobHelper.addSnapshotJarToClassPath(dummyJarFile, intermediatePath, localFS, job); Path expectedJarPath = new Path(intermediatePath, dummyJarFile.getName()); // check file gets uploaded to HDFS