mirror of https://github.com/apache/druid.git
Cleanup test dependencies in hdfs-storage extension (#11563)
* Cleanup test dependencies in hdfs-storage extension * Fix working directory in LocalFileSystem in indexing-hadoop test
This commit is contained in:
parent
bef6f43e3d
commit
2004a94675
|
@ -372,13 +372,6 @@
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<!-- Tests -->
|
<!-- Tests -->
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.hadoop</groupId>
|
|
||||||
<artifactId>hadoop-common</artifactId>
|
|
||||||
<version>${hadoop.compile.version}</version>
|
|
||||||
<classifier>tests</classifier>
|
|
||||||
<scope>test</scope>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>junit</groupId>
|
<groupId>junit</groupId>
|
||||||
<artifactId>junit</artifactId>
|
<artifactId>junit</artifactId>
|
||||||
|
@ -397,19 +390,6 @@
|
||||||
<type>test-jar</type>
|
<type>test-jar</type>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.hadoop</groupId>
|
|
||||||
<artifactId>hadoop-hdfs</artifactId>
|
|
||||||
<version>${hadoop.compile.version}</version>
|
|
||||||
<classifier>tests</classifier>
|
|
||||||
<scope>test</scope>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.hadoop</groupId>
|
|
||||||
<artifactId>hadoop-hdfs</artifactId>
|
|
||||||
<version>${hadoop.compile.version}</version>
|
|
||||||
<scope>test</scope>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.druid</groupId>
|
<groupId>org.apache.druid</groupId>
|
||||||
<artifactId>druid-indexing-hadoop</artifactId>
|
<artifactId>druid-indexing-hadoop</artifactId>
|
||||||
|
|
|
@ -39,8 +39,9 @@ import org.apache.druid.java.util.common.parsers.CloseableIterator;
|
||||||
import org.apache.druid.storage.hdfs.HdfsStorageDruidModule;
|
import org.apache.druid.storage.hdfs.HdfsStorageDruidModule;
|
||||||
import org.apache.druid.testing.InitializedNullHandlingTest;
|
import org.apache.druid.testing.InitializedNullHandlingTest;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.LocalFileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -246,7 +247,7 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
|
||||||
|
|
||||||
public static class ReaderTest
|
public static class ReaderTest
|
||||||
{
|
{
|
||||||
private static final String PATH = "/test";
|
private static final String PATH = "test";
|
||||||
private static final int NUM_FILE = 3;
|
private static final int NUM_FILE = 3;
|
||||||
private static final String KEY_VALUE_SEPARATOR = ",";
|
private static final String KEY_VALUE_SEPARATOR = ",";
|
||||||
private static final String ALPHABET = "abcdefghijklmnopqrstuvwxyz";
|
private static final String ALPHABET = "abcdefghijklmnopqrstuvwxyz";
|
||||||
|
@ -254,7 +255,7 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
|
||||||
@Rule
|
@Rule
|
||||||
public TemporaryFolder temporaryFolder = new TemporaryFolder();
|
public TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||||
|
|
||||||
private MiniDFSCluster dfsCluster;
|
private FileSystem fileSystem;
|
||||||
private HdfsInputSource target;
|
private HdfsInputSource target;
|
||||||
private Set<Path> paths;
|
private Set<Path> paths;
|
||||||
private Map<Long, String> timestampToValue;
|
private Map<Long, String> timestampToValue;
|
||||||
|
@ -266,8 +267,9 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
|
||||||
|
|
||||||
File dir = temporaryFolder.getRoot();
|
File dir = temporaryFolder.getRoot();
|
||||||
Configuration configuration = new Configuration(true);
|
Configuration configuration = new Configuration(true);
|
||||||
configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dir.getAbsolutePath());
|
fileSystem = new LocalFileSystem();
|
||||||
dfsCluster = new MiniDFSCluster.Builder(configuration).build();
|
fileSystem.initialize(dir.toURI(), configuration);
|
||||||
|
fileSystem.setWorkingDirectory(new Path(dir.getAbsolutePath()));
|
||||||
|
|
||||||
paths = IntStream.range(0, NUM_FILE)
|
paths = IntStream.range(0, NUM_FILE)
|
||||||
.mapToObj(
|
.mapToObj(
|
||||||
|
@ -275,7 +277,7 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
|
||||||
char value = ALPHABET.charAt(i % ALPHABET.length());
|
char value = ALPHABET.charAt(i % ALPHABET.length());
|
||||||
timestampToValue.put((long) i, Character.toString(value));
|
timestampToValue.put((long) i, Character.toString(value));
|
||||||
return createFile(
|
return createFile(
|
||||||
dfsCluster,
|
fileSystem,
|
||||||
String.valueOf(i),
|
String.valueOf(i),
|
||||||
i + KEY_VALUE_SEPARATOR + value
|
i + KEY_VALUE_SEPARATOR + value
|
||||||
);
|
);
|
||||||
|
@ -284,30 +286,29 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
|
||||||
.collect(Collectors.toSet());
|
.collect(Collectors.toSet());
|
||||||
|
|
||||||
target = HdfsInputSource.builder()
|
target = HdfsInputSource.builder()
|
||||||
.paths(dfsCluster.getURI() + PATH + "*")
|
.paths(fileSystem.makeQualified(new Path(PATH)) + "*")
|
||||||
.configuration(CONFIGURATION)
|
.configuration(CONFIGURATION)
|
||||||
.inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG)
|
.inputSourceConfig(new HdfsInputSourceConfig(ImmutableSet.of("hdfs", "file")))
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void teardown()
|
public void teardown() throws IOException
|
||||||
{
|
{
|
||||||
if (dfsCluster != null) {
|
temporaryFolder.delete();
|
||||||
dfsCluster.shutdown(true);
|
fileSystem.close();
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Path createFile(MiniDFSCluster dfsCluster, String pathSuffix, String contents)
|
private static Path createFile(FileSystem fs, String pathSuffix, String contents)
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
Path path = new Path(PATH + pathSuffix);
|
Path path = new Path(PATH + pathSuffix);
|
||||||
try (Writer writer = new BufferedWriter(
|
try (Writer writer = new BufferedWriter(
|
||||||
new OutputStreamWriter(dfsCluster.getFileSystem().create(path), StandardCharsets.UTF_8)
|
new OutputStreamWriter(fs.create(path), StandardCharsets.UTF_8)
|
||||||
)) {
|
)) {
|
||||||
writer.write(contents);
|
writer.write(contents);
|
||||||
}
|
}
|
||||||
return path;
|
return fs.makeQualified(path);
|
||||||
}
|
}
|
||||||
catch (IOException e) {
|
catch (IOException e) {
|
||||||
throw new UncheckedIOException(e);
|
throw new UncheckedIOException(e);
|
||||||
|
@ -339,7 +340,6 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
|
||||||
splits.forEach(split -> Assert.assertEquals(1, split.get().size()));
|
splits.forEach(split -> Assert.assertEquals(1, split.get().size()));
|
||||||
Set<Path> actualPaths = splits.stream()
|
Set<Path> actualPaths = splits.stream()
|
||||||
.flatMap(split -> split.get().stream())
|
.flatMap(split -> split.get().stream())
|
||||||
.map(Path::getPathWithoutSchemeAndAuthority)
|
|
||||||
.collect(Collectors.toSet());
|
.collect(Collectors.toSet());
|
||||||
Assert.assertEquals(paths, actualPaths);
|
Assert.assertEquals(paths, actualPaths);
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,8 +25,9 @@ import org.apache.druid.java.util.common.IOE;
|
||||||
import org.apache.druid.java.util.common.StringUtils;
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
import org.apache.druid.storage.hdfs.HdfsFileTimestampVersionFinder;
|
import org.apache.druid.storage.hdfs.HdfsFileTimestampVersionFinder;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.LocalFileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -45,10 +46,10 @@ import java.util.regex.Pattern;
|
||||||
public class HdfsFileTimestampVersionFinderTest
|
public class HdfsFileTimestampVersionFinderTest
|
||||||
{
|
{
|
||||||
|
|
||||||
private static MiniDFSCluster miniCluster;
|
private static FileSystem fileSystem;
|
||||||
private static File hdfsTmpDir;
|
private static File hdfsTmpDir;
|
||||||
private static Path filePath = new Path("/tmp/foo");
|
private static Path filePath = new Path("tmp1/foo");
|
||||||
private static Path perTestPath = new Path("/tmp/tmp2");
|
private static Path perTestPath = new Path("tmp1/tmp2");
|
||||||
private static String pathContents = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum";
|
private static String pathContents = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum";
|
||||||
private static byte[] pathByteContents = StringUtils.toUtf8(pathContents);
|
private static byte[] pathByteContents = StringUtils.toUtf8(pathContents);
|
||||||
private static Configuration conf;
|
private static Configuration conf;
|
||||||
|
@ -61,14 +62,15 @@ public class HdfsFileTimestampVersionFinderTest
|
||||||
throw new IOE("Unable to delete hdfsTmpDir [%s]", hdfsTmpDir.getAbsolutePath());
|
throw new IOE("Unable to delete hdfsTmpDir [%s]", hdfsTmpDir.getAbsolutePath());
|
||||||
}
|
}
|
||||||
conf = new Configuration(true);
|
conf = new Configuration(true);
|
||||||
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsTmpDir.getAbsolutePath());
|
fileSystem = new LocalFileSystem();
|
||||||
miniCluster = new MiniDFSCluster.Builder(conf).build();
|
fileSystem.initialize(hdfsTmpDir.toURI(), conf);
|
||||||
|
fileSystem.setWorkingDirectory(new Path(hdfsTmpDir.toURI()));
|
||||||
|
|
||||||
final File tmpFile = File.createTempFile("hdfsHandlerTest", ".data");
|
final File tmpFile = File.createTempFile("hdfsHandlerTest", ".data");
|
||||||
tmpFile.delete();
|
tmpFile.delete();
|
||||||
try {
|
try {
|
||||||
Files.copy(new ByteArrayInputStream(pathByteContents), tmpFile.toPath());
|
Files.copy(new ByteArrayInputStream(pathByteContents), tmpFile.toPath());
|
||||||
try (OutputStream stream = miniCluster.getFileSystem().create(filePath)) {
|
try (OutputStream stream = fileSystem.create(filePath)) {
|
||||||
Files.copy(tmpFile.toPath(), stream);
|
Files.copy(tmpFile.toPath(), stream);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -80,10 +82,8 @@ public class HdfsFileTimestampVersionFinderTest
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void tearDownStatic() throws IOException
|
public static void tearDownStatic() throws IOException
|
||||||
{
|
{
|
||||||
if (miniCluster != null) {
|
|
||||||
miniCluster.shutdown(true);
|
|
||||||
}
|
|
||||||
FileUtils.deleteDirectory(hdfsTmpDir);
|
FileUtils.deleteDirectory(hdfsTmpDir);
|
||||||
|
fileSystem.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -98,7 +98,7 @@ public class HdfsFileTimestampVersionFinderTest
|
||||||
@After
|
@After
|
||||||
public void tearDown() throws IOException
|
public void tearDown() throws IOException
|
||||||
{
|
{
|
||||||
miniCluster.getFileSystem().delete(perTestPath, true);
|
fileSystem.delete(perTestPath, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -106,8 +106,8 @@ public class HdfsFileTimestampVersionFinderTest
|
||||||
public void testSimpleLatestVersion() throws IOException, InterruptedException
|
public void testSimpleLatestVersion() throws IOException, InterruptedException
|
||||||
{
|
{
|
||||||
final Path oldPath = new Path(perTestPath, "555test.txt");
|
final Path oldPath = new Path(perTestPath, "555test.txt");
|
||||||
Assert.assertFalse(miniCluster.getFileSystem().exists(oldPath));
|
Assert.assertFalse(fileSystem.exists(oldPath));
|
||||||
try (final OutputStream outputStream = miniCluster.getFileSystem().create(oldPath);
|
try (final OutputStream outputStream = fileSystem.create(oldPath);
|
||||||
final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) {
|
final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) {
|
||||||
ByteStreams.copy(inputStream, outputStream);
|
ByteStreams.copy(inputStream, outputStream);
|
||||||
}
|
}
|
||||||
|
@ -115,21 +115,23 @@ public class HdfsFileTimestampVersionFinderTest
|
||||||
Thread.sleep(10);
|
Thread.sleep(10);
|
||||||
|
|
||||||
final Path newPath = new Path(perTestPath, "666test.txt");
|
final Path newPath = new Path(perTestPath, "666test.txt");
|
||||||
Assert.assertFalse(miniCluster.getFileSystem().exists(newPath));
|
Assert.assertFalse(fileSystem.exists(newPath));
|
||||||
try (final OutputStream outputStream = miniCluster.getFileSystem().create(newPath);
|
try (final OutputStream outputStream = fileSystem.create(newPath);
|
||||||
final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) {
|
final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) {
|
||||||
ByteStreams.copy(inputStream, outputStream);
|
ByteStreams.copy(inputStream, outputStream);
|
||||||
}
|
}
|
||||||
|
|
||||||
Assert.assertEquals(newPath.toString(), finder.getLatestVersion(oldPath.toUri(), Pattern.compile(".*")).getPath());
|
Assert.assertEquals(
|
||||||
|
fileSystem.makeQualified(newPath).toUri(),
|
||||||
|
finder.getLatestVersion(fileSystem.makeQualified(oldPath).toUri(), Pattern.compile(".*")));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAlreadyLatestVersion() throws IOException, InterruptedException
|
public void testAlreadyLatestVersion() throws IOException, InterruptedException
|
||||||
{
|
{
|
||||||
final Path oldPath = new Path(perTestPath, "555test.txt");
|
final Path oldPath = new Path(perTestPath, "555test.txt");
|
||||||
Assert.assertFalse(miniCluster.getFileSystem().exists(oldPath));
|
Assert.assertFalse(fileSystem.exists(oldPath));
|
||||||
try (final OutputStream outputStream = miniCluster.getFileSystem().create(oldPath);
|
try (final OutputStream outputStream = fileSystem.create(oldPath);
|
||||||
final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) {
|
final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) {
|
||||||
ByteStreams.copy(inputStream, outputStream);
|
ByteStreams.copy(inputStream, outputStream);
|
||||||
}
|
}
|
||||||
|
@ -137,29 +139,31 @@ public class HdfsFileTimestampVersionFinderTest
|
||||||
Thread.sleep(10);
|
Thread.sleep(10);
|
||||||
|
|
||||||
final Path newPath = new Path(perTestPath, "666test.txt");
|
final Path newPath = new Path(perTestPath, "666test.txt");
|
||||||
Assert.assertFalse(miniCluster.getFileSystem().exists(newPath));
|
Assert.assertFalse(fileSystem.exists(newPath));
|
||||||
try (final OutputStream outputStream = miniCluster.getFileSystem().create(newPath);
|
try (final OutputStream outputStream = fileSystem.create(newPath);
|
||||||
final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) {
|
final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) {
|
||||||
ByteStreams.copy(inputStream, outputStream);
|
ByteStreams.copy(inputStream, outputStream);
|
||||||
}
|
}
|
||||||
|
|
||||||
Assert.assertEquals(newPath.toString(), finder.getLatestVersion(newPath.toUri(), Pattern.compile(".*")).getPath());
|
Assert.assertEquals(
|
||||||
|
fileSystem.makeQualified(newPath).toUri(),
|
||||||
|
finder.getLatestVersion(fileSystem.makeQualified(newPath).toUri(), Pattern.compile(".*")));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNoLatestVersion() throws IOException
|
public void testNoLatestVersion() throws IOException
|
||||||
{
|
{
|
||||||
final Path oldPath = new Path(perTestPath, "555test.txt");
|
final Path oldPath = new Path(perTestPath, "555test.txt");
|
||||||
Assert.assertFalse(miniCluster.getFileSystem().exists(oldPath));
|
Assert.assertFalse(fileSystem.exists(oldPath));
|
||||||
Assert.assertNull(finder.getLatestVersion(oldPath.toUri(), Pattern.compile(".*")));
|
Assert.assertNull(finder.getLatestVersion(fileSystem.makeQualified(oldPath).toUri(), Pattern.compile(".*")));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSimpleLatestVersionInDir() throws IOException, InterruptedException
|
public void testSimpleLatestVersionInDir() throws IOException, InterruptedException
|
||||||
{
|
{
|
||||||
final Path oldPath = new Path(perTestPath, "555test.txt");
|
final Path oldPath = new Path(perTestPath, "555test.txt");
|
||||||
Assert.assertFalse(miniCluster.getFileSystem().exists(oldPath));
|
Assert.assertFalse(fileSystem.exists(oldPath));
|
||||||
try (final OutputStream outputStream = miniCluster.getFileSystem().create(oldPath);
|
try (final OutputStream outputStream = fileSystem.create(oldPath);
|
||||||
final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) {
|
final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) {
|
||||||
ByteStreams.copy(inputStream, outputStream);
|
ByteStreams.copy(inputStream, outputStream);
|
||||||
}
|
}
|
||||||
|
@ -167,24 +171,23 @@ public class HdfsFileTimestampVersionFinderTest
|
||||||
Thread.sleep(10);
|
Thread.sleep(10);
|
||||||
|
|
||||||
final Path newPath = new Path(perTestPath, "666test.txt");
|
final Path newPath = new Path(perTestPath, "666test.txt");
|
||||||
Assert.assertFalse(miniCluster.getFileSystem().exists(newPath));
|
Assert.assertFalse(fileSystem.exists(newPath));
|
||||||
try (final OutputStream outputStream = miniCluster.getFileSystem().create(newPath);
|
try (final OutputStream outputStream = fileSystem.create(newPath);
|
||||||
final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) {
|
final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) {
|
||||||
ByteStreams.copy(inputStream, outputStream);
|
ByteStreams.copy(inputStream, outputStream);
|
||||||
}
|
}
|
||||||
|
|
||||||
Assert.assertEquals(
|
Assert.assertEquals(
|
||||||
newPath.toString(),
|
fileSystem.makeQualified(newPath).toUri(),
|
||||||
finder.getLatestVersion(perTestPath.toUri(), Pattern.compile(".*test\\.txt")).getPath()
|
finder.getLatestVersion(fileSystem.makeQualified(perTestPath).toUri(), Pattern.compile(".*test\\.txt")));
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSkipMismatch() throws IOException, InterruptedException
|
public void testSkipMismatch() throws IOException, InterruptedException
|
||||||
{
|
{
|
||||||
final Path oldPath = new Path(perTestPath, "555test.txt");
|
final Path oldPath = new Path(perTestPath, "555test.txt");
|
||||||
Assert.assertFalse(miniCluster.getFileSystem().exists(oldPath));
|
Assert.assertFalse(fileSystem.exists(oldPath));
|
||||||
try (final OutputStream outputStream = miniCluster.getFileSystem().create(oldPath);
|
try (final OutputStream outputStream = fileSystem.create(oldPath);
|
||||||
final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) {
|
final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) {
|
||||||
ByteStreams.copy(inputStream, outputStream);
|
ByteStreams.copy(inputStream, outputStream);
|
||||||
}
|
}
|
||||||
|
@ -192,15 +195,14 @@ public class HdfsFileTimestampVersionFinderTest
|
||||||
Thread.sleep(10);
|
Thread.sleep(10);
|
||||||
|
|
||||||
final Path newPath = new Path(perTestPath, "666test.txt2");
|
final Path newPath = new Path(perTestPath, "666test.txt2");
|
||||||
Assert.assertFalse(miniCluster.getFileSystem().exists(newPath));
|
Assert.assertFalse(fileSystem.exists(newPath));
|
||||||
try (final OutputStream outputStream = miniCluster.getFileSystem().create(newPath);
|
try (final OutputStream outputStream = fileSystem.create(newPath);
|
||||||
final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) {
|
final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) {
|
||||||
ByteStreams.copy(inputStream, outputStream);
|
ByteStreams.copy(inputStream, outputStream);
|
||||||
}
|
}
|
||||||
|
|
||||||
Assert.assertEquals(
|
Assert.assertEquals(
|
||||||
oldPath.toString(),
|
fileSystem.makeQualified(oldPath).toUri(),
|
||||||
finder.getLatestVersion(perTestPath.toUri(), Pattern.compile(".*test\\.txt")).getPath()
|
finder.getLatestVersion(fileSystem.makeQualified(perTestPath).toUri(), Pattern.compile(".*test\\.txt")));
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,8 +26,9 @@ import org.apache.druid.java.util.common.StringUtils;
|
||||||
import org.apache.druid.segment.loading.SegmentLoadingException;
|
import org.apache.druid.segment.loading.SegmentLoadingException;
|
||||||
import org.apache.druid.utils.CompressionUtils;
|
import org.apache.druid.utils.CompressionUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.LocalFileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -41,7 +42,6 @@ import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.net.URI;
|
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.util.zip.GZIPOutputStream;
|
import java.util.zip.GZIPOutputStream;
|
||||||
|
|
||||||
|
@ -50,11 +50,10 @@ import java.util.zip.GZIPOutputStream;
|
||||||
*/
|
*/
|
||||||
public class HdfsDataSegmentPullerTest
|
public class HdfsDataSegmentPullerTest
|
||||||
{
|
{
|
||||||
private static MiniDFSCluster miniCluster;
|
private static FileSystem fileSystem;
|
||||||
private static File hdfsTmpDir;
|
private static File hdfsTmpDir;
|
||||||
private static URI uriBase;
|
private static Path filePath = new Path("tmp/foo");
|
||||||
private static Path filePath = new Path("/tmp/foo");
|
private static Path perTestPath = new Path("tmp/tmp2");
|
||||||
private static Path perTestPath = new Path("/tmp/tmp2");
|
|
||||||
private static String pathContents = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum";
|
private static String pathContents = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum";
|
||||||
private static byte[] pathByteContents = StringUtils.toUtf8(pathContents);
|
private static byte[] pathByteContents = StringUtils.toUtf8(pathContents);
|
||||||
private static Configuration conf;
|
private static Configuration conf;
|
||||||
|
@ -67,15 +66,15 @@ public class HdfsDataSegmentPullerTest
|
||||||
throw new IOE("Unable to delete hdfsTmpDir [%s]", hdfsTmpDir.getAbsolutePath());
|
throw new IOE("Unable to delete hdfsTmpDir [%s]", hdfsTmpDir.getAbsolutePath());
|
||||||
}
|
}
|
||||||
conf = new Configuration(true);
|
conf = new Configuration(true);
|
||||||
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsTmpDir.getAbsolutePath());
|
fileSystem = new LocalFileSystem();
|
||||||
miniCluster = new MiniDFSCluster.Builder(conf).build();
|
fileSystem.initialize(hdfsTmpDir.toURI(), conf);
|
||||||
uriBase = miniCluster.getURI(0);
|
fileSystem.setWorkingDirectory(new Path(hdfsTmpDir.toURI()));
|
||||||
|
|
||||||
final File tmpFile = File.createTempFile("hdfsHandlerTest", ".data");
|
final File tmpFile = File.createTempFile("hdfsHandlerTest", ".data");
|
||||||
tmpFile.delete();
|
tmpFile.delete();
|
||||||
try {
|
try {
|
||||||
Files.copy(new ByteArrayInputStream(pathByteContents), tmpFile.toPath());
|
Files.copy(new ByteArrayInputStream(pathByteContents), tmpFile.toPath());
|
||||||
try (OutputStream stream = miniCluster.getFileSystem().create(filePath)) {
|
try (OutputStream stream = fileSystem.create(filePath)) {
|
||||||
Files.copy(tmpFile.toPath(), stream);
|
Files.copy(tmpFile.toPath(), stream);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -87,10 +86,8 @@ public class HdfsDataSegmentPullerTest
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void tearDownStatic() throws IOException
|
public static void tearDownStatic() throws IOException
|
||||||
{
|
{
|
||||||
if (miniCluster != null) {
|
|
||||||
miniCluster.shutdown(true);
|
|
||||||
}
|
|
||||||
FileUtils.deleteDirectory(hdfsTmpDir);
|
FileUtils.deleteDirectory(hdfsTmpDir);
|
||||||
|
fileSystem.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -105,7 +102,7 @@ public class HdfsDataSegmentPullerTest
|
||||||
@After
|
@After
|
||||||
public void tearDown() throws IOException
|
public void tearDown() throws IOException
|
||||||
{
|
{
|
||||||
miniCluster.getFileSystem().delete(perTestPath, true);
|
fileSystem.delete(perTestPath, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -118,8 +115,6 @@ public class HdfsDataSegmentPullerTest
|
||||||
|
|
||||||
final File outTmpDir = FileUtils.createTempDir();
|
final File outTmpDir = FileUtils.createTempDir();
|
||||||
|
|
||||||
final URI uri = URI.create(uriBase.toString() + zipPath);
|
|
||||||
|
|
||||||
try (final OutputStream stream = new FileOutputStream(tmpFile)) {
|
try (final OutputStream stream = new FileOutputStream(tmpFile)) {
|
||||||
ByteStreams.copy(new ByteArrayInputStream(pathByteContents), stream);
|
ByteStreams.copy(new ByteArrayInputStream(pathByteContents), stream);
|
||||||
}
|
}
|
||||||
|
@ -128,12 +123,12 @@ public class HdfsDataSegmentPullerTest
|
||||||
final File outFile = new File(outTmpDir, tmpFile.getName());
|
final File outFile = new File(outTmpDir, tmpFile.getName());
|
||||||
outFile.delete();
|
outFile.delete();
|
||||||
|
|
||||||
try (final OutputStream stream = miniCluster.getFileSystem().create(zipPath)) {
|
try (final OutputStream stream = fileSystem.create(zipPath)) {
|
||||||
CompressionUtils.zip(tmpDir, stream);
|
CompressionUtils.zip(tmpDir, stream);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
Assert.assertFalse(outFile.exists());
|
Assert.assertFalse(outFile.exists());
|
||||||
puller.getSegmentFiles(new Path(uri), outTmpDir);
|
puller.getSegmentFiles(fileSystem.makeQualified(zipPath), outTmpDir);
|
||||||
Assert.assertTrue(outFile.exists());
|
Assert.assertTrue(outFile.exists());
|
||||||
|
|
||||||
Assert.assertArrayEquals(pathByteContents, Files.readAllBytes(outFile.toPath()));
|
Assert.assertArrayEquals(pathByteContents, Files.readAllBytes(outFile.toPath()));
|
||||||
|
@ -163,16 +158,14 @@ public class HdfsDataSegmentPullerTest
|
||||||
final File outFile = new File(outTmpDir, "testZip");
|
final File outFile = new File(outTmpDir, "testZip");
|
||||||
outFile.delete();
|
outFile.delete();
|
||||||
|
|
||||||
final URI uri = URI.create(uriBase.toString() + zipPath);
|
try (final OutputStream outputStream = fileSystem.create(zipPath);
|
||||||
|
|
||||||
try (final OutputStream outputStream = miniCluster.getFileSystem().create(zipPath);
|
|
||||||
final OutputStream gzStream = new GZIPOutputStream(outputStream);
|
final OutputStream gzStream = new GZIPOutputStream(outputStream);
|
||||||
final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) {
|
final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) {
|
||||||
ByteStreams.copy(inputStream, gzStream);
|
ByteStreams.copy(inputStream, gzStream);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
Assert.assertFalse(outFile.exists());
|
Assert.assertFalse(outFile.exists());
|
||||||
puller.getSegmentFiles(new Path(uri), outTmpDir);
|
puller.getSegmentFiles(fileSystem.makeQualified(zipPath), outTmpDir);
|
||||||
Assert.assertTrue(outFile.exists());
|
Assert.assertTrue(outFile.exists());
|
||||||
|
|
||||||
Assert.assertArrayEquals(pathByteContents, Files.readAllBytes(outFile.toPath()));
|
Assert.assertArrayEquals(pathByteContents, Files.readAllBytes(outFile.toPath()));
|
||||||
|
@ -197,15 +190,13 @@ public class HdfsDataSegmentPullerTest
|
||||||
final File outFile = new File(outTmpDir, "test.txt");
|
final File outFile = new File(outTmpDir, "test.txt");
|
||||||
outFile.delete();
|
outFile.delete();
|
||||||
|
|
||||||
final URI uri = URI.create(uriBase.toString() + perTestPath);
|
try (final OutputStream outputStream = fileSystem.create(zipPath);
|
||||||
|
|
||||||
try (final OutputStream outputStream = miniCluster.getFileSystem().create(zipPath);
|
|
||||||
final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) {
|
final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) {
|
||||||
ByteStreams.copy(inputStream, outputStream);
|
ByteStreams.copy(inputStream, outputStream);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
Assert.assertFalse(outFile.exists());
|
Assert.assertFalse(outFile.exists());
|
||||||
puller.getSegmentFiles(new Path(uri), outTmpDir);
|
puller.getSegmentFiles(fileSystem.makeQualified(perTestPath), outTmpDir);
|
||||||
Assert.assertTrue(outFile.exists());
|
Assert.assertTrue(outFile.exists());
|
||||||
|
|
||||||
Assert.assertArrayEquals(pathByteContents, Files.readAllBytes(outFile.toPath()));
|
Assert.assertArrayEquals(pathByteContents, Files.readAllBytes(outFile.toPath()));
|
||||||
|
|
|
@ -79,14 +79,15 @@ public class HdfsClasspathSetupTest
|
||||||
conf = new Configuration(true);
|
conf = new Configuration(true);
|
||||||
localFS = new LocalFileSystem();
|
localFS = new LocalFileSystem();
|
||||||
localFS.initialize(hdfsTmpDir.toURI(), conf);
|
localFS.initialize(hdfsTmpDir.toURI(), conf);
|
||||||
|
localFS.setWorkingDirectory(new Path(hdfsTmpDir.toURI()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws IOException
|
public void setUp() throws IOException
|
||||||
{
|
{
|
||||||
// intermedatePath and finalClasspath are relative to hdfsTmpDir directory.
|
// intermedatePath and finalClasspath are relative to hdfsTmpDir directory.
|
||||||
intermediatePath = new Path(StringUtils.format("/tmp/classpath/%s", UUIDUtils.generateUuid()));
|
intermediatePath = new Path(StringUtils.format("tmp/classpath/%s", UUIDUtils.generateUuid()));
|
||||||
finalClasspath = new Path(StringUtils.format("/tmp/intermediate/%s", UUIDUtils.generateUuid()));
|
finalClasspath = new Path(StringUtils.format("tmp/intermediate/%s", UUIDUtils.generateUuid()));
|
||||||
dummyJarFile = tempFolder.newFile("dummy-test.jar");
|
dummyJarFile = tempFolder.newFile("dummy-test.jar");
|
||||||
Files.copy(
|
Files.copy(
|
||||||
new ByteArrayInputStream(StringUtils.toUtf8(dummyJarString)),
|
new ByteArrayInputStream(StringUtils.toUtf8(dummyJarString)),
|
||||||
|
@ -116,7 +117,7 @@ public class HdfsClasspathSetupTest
|
||||||
public void testAddSnapshotJarToClasspath() throws IOException
|
public void testAddSnapshotJarToClasspath() throws IOException
|
||||||
{
|
{
|
||||||
Job job = Job.getInstance(conf, "test-job");
|
Job job = Job.getInstance(conf, "test-job");
|
||||||
Path intermediatePath = new Path("/tmp/classpath");
|
Path intermediatePath = new Path("tmp/classpath");
|
||||||
JobHelper.addSnapshotJarToClassPath(dummyJarFile, intermediatePath, localFS, job);
|
JobHelper.addSnapshotJarToClassPath(dummyJarFile, intermediatePath, localFS, job);
|
||||||
Path expectedJarPath = new Path(intermediatePath, dummyJarFile.getName());
|
Path expectedJarPath = new Path(intermediatePath, dummyJarFile.getName());
|
||||||
// check file gets uploaded to HDFS
|
// check file gets uploaded to HDFS
|
||||||
|
|
Loading…
Reference in New Issue