diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index e0d268105d7..ccd20ab704f 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -123,6 +123,9 @@ Release 2.3.0 - UNRELEASED HADOOP-10006. Compilation failure in trunk for o.a.h.fs.swift.util.JSONUtil (Junping Du via stevel) + HADOOP-9016. HarFsInputStream.skip(long) must never return negative value. + (Ivan A. Veselovsky via jeagles) + Release 2.2.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java index afa45246f5e..091b35a846a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java @@ -898,11 +898,15 @@ public class HarFileSystem extends FileSystem { private long position, start, end; //The underlying data input stream that the // underlying filesystem will return. - private FSDataInputStream underLyingStream; + private final FSDataInputStream underLyingStream; //one byte buffer - private byte[] oneBytebuff = new byte[1]; + private final byte[] oneBytebuff = new byte[1]; + HarFsInputStream(FileSystem fs, Path path, long start, long length, int bufferSize) throws IOException { + if (length < 0) { + throw new IllegalArgumentException("Negative length ["+length+"]"); + } underLyingStream = fs.open(path, bufferSize); underLyingStream.seek(start); // the start of this file in the part file @@ -916,7 +920,7 @@ public class HarFileSystem extends FileSystem { @Override public synchronized int available() throws IOException { long remaining = end - underLyingStream.getPos(); - if (remaining > (long)Integer.MAX_VALUE) { + if (remaining > Integer.MAX_VALUE) { return Integer.MAX_VALUE; } return (int) remaining; @@ -948,10 +952,14 @@ public class HarFileSystem extends FileSystem { return (ret <= 0) ? -1: (oneBytebuff[0] & 0xff); } + // NB: currently this method actually never executed becusae + // java.io.DataInputStream.read(byte[]) directly delegates to + // method java.io.InputStream.read(byte[], int, int). + // However, potentially it can be invoked, so leave it intact for now. @Override public synchronized int read(byte[] b) throws IOException { - int ret = read(b, 0, b.length); - if (ret != -1) { + final int ret = read(b, 0, b.length); + if (ret > 0) { position += ret; } return ret; @@ -980,15 +988,19 @@ public class HarFileSystem extends FileSystem { public synchronized long skip(long n) throws IOException { long tmpN = n; if (tmpN > 0) { - if (position + tmpN > end) { - tmpN = end - position; - } + final long actualRemaining = end - position; + if (tmpN > actualRemaining) { + tmpN = actualRemaining; + } underLyingStream.seek(tmpN + position); position += tmpN; return tmpN; - } - return (tmpN < 0)? -1 : 0; - } + } + // NB: the contract is described in java.io.InputStream.skip(long): + // this method returns the number of bytes actually skipped, so, + // the return value should never be negative. + return 0; + } @Override public synchronized long getPos() throws IOException { @@ -996,14 +1008,23 @@ public class HarFileSystem extends FileSystem { } @Override - public synchronized void seek(long pos) throws IOException { - if (pos < 0 || (start + pos > end)) { - throw new IOException("Failed to seek: EOF"); - } + public synchronized void seek(final long pos) throws IOException { + validatePosition(pos); position = start + pos; underLyingStream.seek(position); } + private void validatePosition(final long pos) throws IOException { + if (pos < 0) { + throw new IOException("Negative position: "+pos); + } + final long length = end - start; + if (pos > length) { + throw new IOException("Position behind the end " + + "of the stream (length = "+length+"): " + pos); + } + } + @Override public boolean seekToNewSource(long targetPos) throws IOException { // do not need to implement this @@ -1020,7 +1041,12 @@ public class HarFileSystem extends FileSystem { throws IOException { int nlength = length; if (start + nlength + pos > end) { - nlength = (int) (end - (start + pos)); + // length corrected to the real remaining length: + nlength = (int) (end - start - pos); + } + if (nlength <= 0) { + // EOS: + return -1; } return underLyingStream.read(pos + start , b, offset, nlength); } diff --git a/hadoop-tools/hadoop-archives/src/test/java/org/apache/hadoop/tools/TestHadoopArchives.java b/hadoop-tools/hadoop-archives/src/test/java/org/apache/hadoop/tools/TestHadoopArchives.java index b6310fd91b1..65bbbe451bf 100644 --- a/hadoop-tools/hadoop-archives/src/test/java/org/apache/hadoop/tools/TestHadoopArchives.java +++ b/hadoop-tools/hadoop-archives/src/test/java/org/apache/hadoop/tools/TestHadoopArchives.java @@ -19,6 +19,7 @@ package org.apache.hadoop.tools; import java.io.ByteArrayOutputStream; +import java.io.FilterInputStream; import java.io.IOException; import java.io.PrintStream; import java.net.URI; @@ -30,9 +31,13 @@ import java.util.StringTokenizer; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsShell; +import org.apache.hadoop.fs.HarFileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.IOUtils; @@ -42,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity import org.apache.log4j.Level; import org.junit.After; import org.junit.Assert; +import static org.junit.Assert.*; import org.junit.Before; import org.junit.Test; @@ -62,19 +68,36 @@ public class TestHadoopArchives { private static final String inputDir = "input"; private Path inputPath; + private Path archivePath; + private final List fileList = new ArrayList(); private MiniDFSCluster dfscluster; private Configuration conf; private FileSystem fs; - private Path archivePath; - static private Path createFile(Path dir, String filename, FileSystem fs) - throws IOException { - final Path f = new Path(dir, filename); + private static String createFile(Path root, FileSystem fs, String... dirsAndFile + ) throws IOException { + String fileBaseName = dirsAndFile[dirsAndFile.length - 1]; + return createFile(root, fs, fileBaseName.getBytes("UTF-8"), dirsAndFile); + } + + private static String createFile(Path root, FileSystem fs, byte[] fileContent, String... dirsAndFile + ) throws IOException { + StringBuilder sb = new StringBuilder(); + for (String segment: dirsAndFile) { + if (sb.length() > 0) { + sb.append(Path.SEPARATOR); + } + sb.append(segment); + } + final Path f = new Path(root, sb.toString()); final FSDataOutputStream out = fs.create(f); - out.write(filename.getBytes()); - out.close(); - return f; + try { + out.write(fileContent); + } finally { + out.close(); + } + return sb.toString(); } @Before @@ -86,102 +109,80 @@ public class TestHadoopArchives { conf.set(CapacitySchedulerConfiguration.PREFIX + CapacitySchedulerConfiguration.ROOT + ".default." + CapacitySchedulerConfiguration.CAPACITY, "100"); - dfscluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true) - .build(); + dfscluster = new MiniDFSCluster + .Builder(conf) + .checkExitOnShutdown(true) + .numDataNodes(2) + .format(true) + .racks(null) + .build(); fs = dfscluster.getFileSystem(); - inputPath = new Path(fs.getHomeDirectory(), inputDir); + + // prepare archive path: archivePath = new Path(fs.getHomeDirectory(), "archive"); + fs.delete(archivePath, true); + + // prepare input path: + inputPath = new Path(fs.getHomeDirectory(), inputDir); + fs.delete(inputPath, true); fs.mkdirs(inputPath); - createFile(inputPath, "a", fs); - createFile(inputPath, "b", fs); - createFile(inputPath, "c", fs); + // create basic input files: + fileList.add(createFile(inputPath, fs, "a")); + fileList.add(createFile(inputPath, fs, "b")); + fileList.add(createFile(inputPath, fs, "c")); } @After public void tearDown() throws Exception { - try { - if (dfscluster != null) { - dfscluster.shutdown(); - } - if (dfscluster != null) { - dfscluster.shutdown(); - } - } catch (Exception e) { - System.err.println(e); + if (dfscluster != null) { + dfscluster.shutdown(); } } @Test public void testRelativePath() throws Exception { - fs.delete(archivePath, true); - final Path sub1 = new Path(inputPath, "dir1"); fs.mkdirs(sub1); - createFile(sub1, "a", fs); + createFile(inputPath, fs, sub1.getName(), "a"); final FsShell shell = new FsShell(conf); final List originalPaths = lsr(shell, "input"); - System.out.println("originalPath: " + originalPaths); - final URI uri = fs.getUri(); - final String prefix = "har://hdfs-" + uri.getHost() + ":" + uri.getPort() - + archivePath.toUri().getPath() + Path.SEPARATOR; + System.out.println("originalPaths: " + originalPaths); - { - final String harName = "foo.har"; - final String[] args = { "-archiveName", harName, "-p", "input", "*", - "archive" }; - System.setProperty(HadoopArchives.TEST_HADOOP_ARCHIVES_JAR_PATH, - HADOOP_ARCHIVES_JAR); - final HadoopArchives har = new HadoopArchives(conf); - Assert.assertEquals(0, ToolRunner.run(har, args)); + // make the archive: + final String fullHarPathStr = makeArchive(); - // compare results - final List harPaths = lsr(shell, prefix + harName); - Assert.assertEquals(originalPaths, harPaths); - } + // compare results: + final List harPaths = lsr(shell, fullHarPathStr); + Assert.assertEquals(originalPaths, harPaths); } @Test public void testPathWithSpaces() throws Exception { - fs.delete(archivePath, true); - // create files/directories with spaces - createFile(inputPath, "c c", fs); + createFile(inputPath, fs, "c c"); final Path sub1 = new Path(inputPath, "sub 1"); fs.mkdirs(sub1); - createFile(sub1, "file x y z", fs); - createFile(sub1, "file", fs); - createFile(sub1, "x", fs); - createFile(sub1, "y", fs); - createFile(sub1, "z", fs); + createFile(sub1, fs, "file x y z"); + createFile(sub1, fs, "file"); + createFile(sub1, fs, "x"); + createFile(sub1, fs, "y"); + createFile(sub1, fs, "z"); final Path sub2 = new Path(inputPath, "sub 1 with suffix"); fs.mkdirs(sub2); - createFile(sub2, "z", fs); + createFile(sub2, fs, "z"); final FsShell shell = new FsShell(conf); - final String inputPathStr = inputPath.toUri().getPath(); - final List originalPaths = lsr(shell, inputPathStr); - final URI uri = fs.getUri(); - final String prefix = "har://hdfs-" + uri.getHost() + ":" + uri.getPort() - + archivePath.toUri().getPath() + Path.SEPARATOR; - {// Enable space replacement - final String harName = "foo.har"; - final String[] args = { "-archiveName", harName, "-p", inputPathStr, "*", - archivePath.toString() }; - System.setProperty(HadoopArchives.TEST_HADOOP_ARCHIVES_JAR_PATH, - HADOOP_ARCHIVES_JAR); - final HadoopArchives har = new HadoopArchives(conf); - Assert.assertEquals(0, ToolRunner.run(har, args)); - - // compare results - final List harPaths = lsr(shell, prefix + harName); - Assert.assertEquals(originalPaths, harPaths); - } + // make the archive: + final String fullHarPathStr = makeArchive(); + // compare results + final List harPaths = lsr(shell, fullHarPathStr); + Assert.assertEquals(originalPaths, harPaths); } private static List lsr(final FsShell shell, String dir) @@ -222,4 +223,442 @@ public class TestHadoopArchives { .println("lsr paths = " + paths.toString().replace(", ", ",\n ")); return paths; } + + @Test + public void testReadFileContent() throws Exception { + fileList.add(createFile(inputPath, fs, "c c")); + final Path sub1 = new Path(inputPath, "sub 1"); + fs.mkdirs(sub1); + fileList.add(createFile(inputPath, fs, sub1.getName(), "file x y z")); + fileList.add(createFile(inputPath, fs, sub1.getName(), "file")); + fileList.add(createFile(inputPath, fs, sub1.getName(), "x")); + fileList.add(createFile(inputPath, fs, sub1.getName(), "y")); + fileList.add(createFile(inputPath, fs, sub1.getName(), "z")); + final Path sub2 = new Path(inputPath, "sub 1 with suffix"); + fs.mkdirs(sub2); + fileList.add(createFile(inputPath, fs, sub2.getName(), "z")); + // Generate a big binary file content: + final byte[] binContent = prepareBin(); + fileList.add(createFile(inputPath, fs, binContent, sub2.getName(), "bin")); + fileList.add(createFile(inputPath, fs, new byte[0], sub2.getName(), "zero-length")); + + final String fullHarPathStr = makeArchive(); + + // Create fresh HarFs: + final HarFileSystem harFileSystem = new HarFileSystem(fs); + try { + final URI harUri = new URI(fullHarPathStr); + harFileSystem.initialize(harUri, fs.getConf()); + // now read the file content and compare it against the expected: + int readFileCount = 0; + for (final String pathStr0 : fileList) { + final Path path = new Path(fullHarPathStr + Path.SEPARATOR + pathStr0); + final String baseName = path.getName(); + final FileStatus status = harFileSystem.getFileStatus(path); + if (status.isFile()) { + // read the file: + final byte[] actualContentSimple = readAllSimple( + harFileSystem.open(path), true); + + final byte[] actualContentBuffer = readAllWithBuffer( + harFileSystem.open(path), true); + assertArrayEquals(actualContentSimple, actualContentBuffer); + + final byte[] actualContentFully = readAllWithReadFully( + actualContentSimple.length, + harFileSystem.open(path), true); + assertArrayEquals(actualContentSimple, actualContentFully); + + final byte[] actualContentSeek = readAllWithSeek( + actualContentSimple.length, + harFileSystem.open(path), true); + assertArrayEquals(actualContentSimple, actualContentSeek); + + final byte[] actualContentRead4 + = readAllWithRead4(harFileSystem.open(path), true); + assertArrayEquals(actualContentSimple, actualContentRead4); + + final byte[] actualContentSkip = readAllWithSkip( + actualContentSimple.length, + harFileSystem.open(path), + harFileSystem.open(path), + true); + assertArrayEquals(actualContentSimple, actualContentSkip); + + if ("bin".equals(baseName)) { + assertArrayEquals(binContent, actualContentSimple); + } else if ("zero-length".equals(baseName)) { + assertEquals(0, actualContentSimple.length); + } else { + String actual = new String(actualContentSimple, "UTF-8"); + assertEquals(baseName, actual); + } + readFileCount++; + } + } + assertEquals(fileList.size(), readFileCount); + } finally { + harFileSystem.close(); + } + } + + private static byte[] readAllSimple(FSDataInputStream fsdis, boolean close) throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + int b; + while (true) { + b = fsdis.read(); + if (b < 0) { + break; + } else { + baos.write(b); + } + } + baos.close(); + return baos.toByteArray(); + } finally { + if (close) { + fsdis.close(); + } + } + } + + private static byte[] readAllWithBuffer(FSDataInputStream fsdis, boolean close) + throws IOException { + try { + final int available = fsdis.available(); + final byte[] buffer; + final ByteArrayOutputStream baos; + if (available < 0) { + buffer = new byte[1024]; + baos = new ByteArrayOutputStream(buffer.length * 2); + } else { + buffer = new byte[available]; + baos = new ByteArrayOutputStream(available); + } + int readIntoBuffer = 0; + int read; + while (true) { + read = fsdis.read(buffer, readIntoBuffer, buffer.length - readIntoBuffer); + if (read < 0) { + // end of stream: + if (readIntoBuffer > 0) { + baos.write(buffer, 0, readIntoBuffer); + } + return baos.toByteArray(); + } else { + readIntoBuffer += read; + if (readIntoBuffer == buffer.length) { + // buffer is full, need to clean the buffer. + // drop the buffered data to baos: + baos.write(buffer); + // reset the counter to start reading to the buffer beginning: + readIntoBuffer = 0; + } else if (readIntoBuffer > buffer.length) { + throw new IOException("Read more than the buffer length: " + + readIntoBuffer + ", buffer length = " + buffer.length); + } + } + } + } finally { + if (close) { + fsdis.close(); + } + } + } + + private static byte[] readAllWithReadFully(int totalLength, FSDataInputStream fsdis, boolean close) + throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + // Simulate reading of some data structures of known length: + final byte[] buffer = new byte[17]; + final int times = totalLength / buffer.length; + final int remainder = totalLength % buffer.length; + // it would be simpler to leave the position tracking to the + // InputStream, but we need to check the methods #readFully(2) + // and #readFully(4) that receive the position as a parameter: + int position = 0; + try { + // read "data structures": + for (int i=0; i 0) { + // read the remainder: + fsdis.readFully(position, buffer, 0, remainder); + position += remainder; + baos.write(buffer, 0, remainder); + } + try { + fsdis.readFully(position, buffer, 0, 1); + assertTrue(false); + } catch (IOException ioe) { + // okay + } + assertEquals(totalLength, position); + final byte[] result = baos.toByteArray(); + assertEquals(totalLength, result.length); + return result; + } finally { + if (close) { + fsdis.close(); + } + } + } + + private static byte[] readAllWithRead4(FSDataInputStream fsdis, boolean close) + throws IOException { + try { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final byte[] buffer = new byte[17]; + int totalRead = 0; + int read; + while (true) { + read = fsdis.read(totalRead, buffer, 0, buffer.length); + if (read > 0) { + totalRead += read; + baos.write(buffer, 0, read); + } else if (read < 0) { + break; // EOF + } else { + // read == 0: + // zero result may be returned *only* in case if the 4th + // parameter is 0. Since in our case this is 'buffer.length', + // zero return value clearly indicates a bug: + throw new AssertionError("FSDataInputStream#read(4) returned 0, while " + + " the 4th method parameter is " + buffer.length + "."); + } + } + final byte[] result = baos.toByteArray(); + return result; + } finally { + if (close) { + fsdis.close(); + } + } + } + + private static byte[] readAllWithSeek(final int totalLength, + final FSDataInputStream fsdis, final boolean close) + throws IOException { + final byte[] result = new byte[totalLength]; + long pos; + try { + // read the data in the reverse order, from + // the tail to the head by pieces of 'buffer' length: + final byte[] buffer = new byte[17]; + final int times = totalLength / buffer.length; + int read; + int expectedRead; + for (int i=times; i>=0; i--) { + pos = i * buffer.length; + fsdis.seek(pos); + // check that seek is successful: + assertEquals(pos, fsdis.getPos()); + read = fsdis.read(buffer); + // check we read right number of bytes: + if (i == times) { + expectedRead = totalLength % buffer.length; // remainder + if (expectedRead == 0) { + // zero remainder corresponds to the EOS, so + // by the contract of DataInpitStream#read(byte[]) -1 should be + // returned: + expectedRead = -1; + } + } else { + expectedRead = buffer.length; + } + assertEquals(expectedRead, read); + if (read > 0) { + System.arraycopy(buffer, 0, result, (int)pos, read); + } + } + + // finally, check that #seek() to not existing position leads to IOE: + expectSeekIOE(fsdis, Long.MAX_VALUE, "Seek to Long.MAX_VALUE should lead to IOE."); + expectSeekIOE(fsdis, Long.MIN_VALUE, "Seek to Long.MIN_VALUE should lead to IOE."); + long pp = -1L; + expectSeekIOE(fsdis, pp, "Seek to "+pp+" should lead to IOE."); + + // NB: is is *possible* to #seek(length), but *impossible* to #seek(length + 1): + fsdis.seek(totalLength); + assertEquals(totalLength, fsdis.getPos()); + pp = totalLength + 1; + expectSeekIOE(fsdis, pp, "Seek to the length position + 1 ("+pp+") should lead to IOE."); + + return result; + } finally { + if (close) { + fsdis.close(); + } + } + } + + private static void expectSeekIOE(FSDataInputStream fsdis, long seekPos, String message) { + try { + fsdis.seek(seekPos); + assertTrue(message + " (Position = " + fsdis.getPos() + ")", false); + } catch (IOException ioe) { + // okay + } + } + + /* + * Reads data by chunks from 2 input streams: + * reads chunk from stream 1, and skips this chunk in the stream 2; + * Then reads next chunk from stream 2, and skips this chunk in stream 1. + */ + private static byte[] readAllWithSkip( + final int totalLength, + final FSDataInputStream fsdis1, + final FSDataInputStream fsdis2, + final boolean close) + throws IOException { + // test negative skip arg: + assertEquals(0, fsdis1.skip(-1)); + // test zero skip arg: + assertEquals(0, fsdis1.skip(0)); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(totalLength); + try { + // read the data in the reverse order, from + // the tail to the head by pieces of 'buffer' length: + final byte[] buffer = new byte[17]; + final int times = totalLength / buffer.length; + final int remainder = totalLength % buffer.length; + long skipped; + long expectedPosition; + int toGo; + for (int i=0; i<=times; i++) { + toGo = (i < times) ? buffer.length : remainder; + if (i % 2 == 0) { + fsdis1.readFully(buffer, 0, toGo); + skipped = skipUntilZero(fsdis2, toGo); + } else { + fsdis2.readFully(buffer, 0, toGo); + skipped = skipUntilZero(fsdis1, toGo); + } + if (i < times) { + assertEquals(buffer.length, skipped); + expectedPosition = (i + 1) * buffer.length; + } else { + // remainder: + if (remainder > 0) { + assertEquals(remainder, skipped); + } else { + assertEquals(0, skipped); + } + expectedPosition = totalLength; + } + // check if the 2 streams have equal and correct positions: + assertEquals(expectedPosition, fsdis1.getPos()); + assertEquals(expectedPosition, fsdis2.getPos()); + // save the read data: + if (toGo > 0) { + baos.write(buffer, 0, toGo); + } + } + + // finally, check up if ended stream cannot skip: + assertEquals(0, fsdis1.skip(-1)); + assertEquals(0, fsdis1.skip(0)); + assertEquals(0, fsdis1.skip(1)); + assertEquals(0, fsdis1.skip(Long.MAX_VALUE)); + + return baos.toByteArray(); + } finally { + if (close) { + fsdis1.close(); + fsdis2.close(); + } + } + } + + private static long skipUntilZero(final FilterInputStream fis, + final long toSkip) throws IOException { + long skipped = 0; + long remainsToSkip = toSkip; + long s; + while (skipped < toSkip) { + s = fis.skip(remainsToSkip); // actually skippped + if (s == 0) { + return skipped; // EOF or impossible to skip. + } + skipped += s; + remainsToSkip -= s; + } + return skipped; + } + + private static byte[] prepareBin() { + byte[] bb = new byte[77777]; + for (int i=0; i