From 776deafb4c7b623f02ae1e31d5ba80e14726d665 Mon Sep 17 00:00:00 2001 From: Jonathan Turner Eagles Date: Wed, 23 Oct 2013 04:10:09 +0000 Subject: [PATCH 1/3] HADOOP-9598. Improve code coverage of RMAdminCLI (Aleksey Gorshkov and Andrey Klochkov via jeagles) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1534905 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop/yarn/client/cli/RMAdminCLI.java | 2 +- .../hadoop/yarn/client/TestRMAdminCLI.java | 243 ++++++++++++++++++ 3 files changed, 247 insertions(+), 1 deletion(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMAdminCLI.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 3a3c9d6d577..44f6fec31b5 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -64,6 +64,9 @@ Release 2.3.0 - UNRELEASED YARN-1182. MiniYARNCluster creates and inits the RM/NM only on start() (Karthik Kambatla via Sandy Ryza) + HADOOP-9598. Improve code coverage of RMAdminCLI (Aleksey Gorshkov and + Andrey Klochkov via jeagles) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java index 11335c0d8f6..5d1b5d66b5b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java @@ -162,7 +162,7 @@ private static void printUsage(String cmd) { } } - private ResourceManagerAdministrationProtocol createAdminProtocol() throws IOException { + protected ResourceManagerAdministrationProtocol createAdminProtocol() throws IOException { // Get the current configuration final YarnConfiguration conf = new YarnConfiguration(getConf()); return ClientRMProxy.createRMProxy(conf, ResourceManagerAdministrationProtocol.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMAdminCLI.java new file mode 100644 index 00000000000..675a9be0ee8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMAdminCLI.java @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; + +import org.apache.hadoop.yarn.client.cli.RMAdminCLI; +import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentMatcher; + +public class TestRMAdminCLI { + + private ResourceManagerAdministrationProtocol admin; + private RMAdminCLI rmAdminCLI; + + @Before + public void configure() { + admin = mock(ResourceManagerAdministrationProtocol.class); + rmAdminCLI = new RMAdminCLI() { + + @Override + protected ResourceManagerAdministrationProtocol createAdminProtocol() + throws IOException { + return admin; + } + + }; + } + + @Test(timeout=500) + public void testRefreshQueues() throws Exception { + String[] args = { "-refreshQueues" }; + assertEquals(0, rmAdminCLI.run(args)); + verify(admin).refreshQueues(any(RefreshQueuesRequest.class)); + } + + @Test(timeout=500) + public void testRefreshUserToGroupsMappings() throws Exception { + String[] args = { "-refreshUserToGroupsMappings" }; + assertEquals(0, rmAdminCLI.run(args)); + verify(admin).refreshUserToGroupsMappings( + any(RefreshUserToGroupsMappingsRequest.class)); + } + + @Test(timeout=500) + public void testRefreshSuperUserGroupsConfiguration() throws Exception { + String[] args = { "-refreshSuperUserGroupsConfiguration" }; + assertEquals(0, rmAdminCLI.run(args)); + verify(admin).refreshSuperUserGroupsConfiguration( + any(RefreshSuperUserGroupsConfigurationRequest.class)); + } + + @Test(timeout=500) + public void testRefreshAdminAcls() throws Exception { + String[] args = { "-refreshAdminAcls" }; + assertEquals(0, rmAdminCLI.run(args)); + verify(admin).refreshAdminAcls(any(RefreshAdminAclsRequest.class)); + } + + @Test(timeout=500) + public void testRefreshServiceAcl() throws Exception { + String[] args = { "-refreshServiceAcl" }; + assertEquals(0, rmAdminCLI.run(args)); + verify(admin).refreshServiceAcls(any(RefreshServiceAclsRequest.class)); + } + + @Test(timeout=500) + public void testRefreshNodes() throws Exception { + String[] args = { "-refreshNodes" }; + assertEquals(0, rmAdminCLI.run(args)); + verify(admin).refreshNodes(any(RefreshNodesRequest.class)); + } + + @Test(timeout=500) + public void testGetGroups() throws Exception { + when(admin.getGroupsForUser(eq("admin"))).thenReturn( + new String[] {"group1", "group2"}); + PrintStream origOut = System.out; + PrintStream out = mock(PrintStream.class); + System.setOut(out); + try { + String[] args = { "-getGroups", "admin" }; + assertEquals(0, rmAdminCLI.run(args)); + verify(admin).getGroupsForUser(eq("admin")); + verify(out).println(argThat(new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return ("" + argument).equals("admin : group1 group2"); + } + })); + } finally { + System.setOut(origOut); + } + } + + /** + * Test printing of help messages + */ + @Test(timeout=500) + public void testHelp() throws Exception { + PrintStream oldOutPrintStream = System.out; + PrintStream oldErrPrintStream = System.err; + ByteArrayOutputStream dataOut = new ByteArrayOutputStream(); + ByteArrayOutputStream dataErr = new ByteArrayOutputStream(); + System.setOut(new PrintStream(dataOut)); + System.setErr(new PrintStream(dataErr)); + try { + String[] args = { "-help" }; + assertEquals(0, rmAdminCLI.run(args)); + assertTrue(dataOut + .toString() + .contains( + "rmadmin is the command to execute Map-Reduce" + + " administrative commands.")); + assertTrue(dataOut + .toString() + .contains( + "hadoop rmadmin [-refreshQueues] [-refreshNodes] [-refreshSuper" + + "UserGroupsConfiguration] [-refreshUserToGroupsMappings] " + + "[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup" + + " [username]] [-help [cmd]]")); + assertTrue(dataOut + .toString() + .contains( + "-refreshQueues: Reload the queues' acls, states and scheduler " + + "specific properties.")); + assertTrue(dataOut + .toString() + .contains( + "-refreshNodes: Refresh the hosts information at the " + + "ResourceManager.")); + assertTrue(dataOut.toString().contains( + "-refreshUserToGroupsMappings: Refresh user-to-groups mappings")); + assertTrue(dataOut + .toString() + .contains( + "-refreshSuperUserGroupsConfiguration: Refresh superuser proxy" + + " groups mappings")); + assertTrue(dataOut + .toString() + .contains( + "-refreshAdminAcls: Refresh acls for administration of " + + "ResourceManager")); + assertTrue(dataOut + .toString() + .contains( + "-refreshServiceAcl: Reload the service-level authorization" + + " policy file")); + assertTrue(dataOut + .toString() + .contains( + "-help [cmd]: \tDisplays help for the given command or all " + + "commands if none")); + + testError(new String[] { "-help", "-refreshQueues" }, + "Usage: java RMAdmin [-refreshQueues]", dataErr, 0); + testError(new String[] { "-help", "-refreshNodes" }, + "Usage: java RMAdmin [-refreshNodes]", dataErr, 0); + testError(new String[] { "-help", "-refreshUserToGroupsMappings" }, + "Usage: java RMAdmin [-refreshUserToGroupsMappings]", dataErr, 0); + testError( + new String[] { "-help", "-refreshSuperUserGroupsConfiguration" }, + "Usage: java RMAdmin [-refreshSuperUserGroupsConfiguration]", + dataErr, 0); + testError(new String[] { "-help", "-refreshAdminAcls" }, + "Usage: java RMAdmin [-refreshAdminAcls]", dataErr, 0); + testError(new String[] { "-help", "-refreshService" }, + "Usage: java RMAdmin [-refreshServiceAcl]", dataErr, 0); + testError(new String[] { "-help", "-getGroups" }, + "Usage: java RMAdmin [-getGroups [username]]", dataErr, 0); + + + testError(new String[] { "-help", "-badParameter" }, + "Usage: java RMAdmin", dataErr, 0); + testError(new String[] { "-badParameter" }, + "badParameter: Unknown command", dataErr, -1); + } finally { + System.setOut(oldOutPrintStream); + System.setErr(oldErrPrintStream); + } + } + + @Test(timeout=500) + public void testException() throws Exception { + PrintStream oldErrPrintStream = System.err; + ByteArrayOutputStream dataErr = new ByteArrayOutputStream(); + System.setErr(new PrintStream(dataErr)); + try { + when(admin.refreshQueues(any(RefreshQueuesRequest.class))) + .thenThrow(new IOException("test exception")); + String[] args = { "-refreshQueues" }; + + assertEquals(-1, rmAdminCLI.run(args)); + verify(admin).refreshQueues(any(RefreshQueuesRequest.class)); + assertTrue(dataErr.toString().contains("refreshQueues: test exception")); + } finally { + System.setErr(oldErrPrintStream); + } + } + + private void testError(String[] args, String template, + ByteArrayOutputStream data, int resultCode) throws Exception { + assertEquals(resultCode, rmAdminCLI.run(args)); + assertTrue(data.toString().contains(template)); + data.reset(); + } + +} From 82ed72d1d4f7263449ee75ebbd7d668be9f3bdc1 Mon Sep 17 00:00:00 2001 From: Aaron Myers Date: Wed, 23 Oct 2013 15:15:29 +0000 Subject: [PATCH 2/3] HDFS-5403. WebHdfs client cannot communicate with older WebHdfs servers post HDFS-5306. Contributed by Aaron T. Myers. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1535056 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +++ .../org/apache/hadoop/hdfs/web/JsonUtil.java | 9 +++++-- .../apache/hadoop/hdfs/web/TestJsonUtil.java | 24 +++++++++++++++++++ 3 files changed, 34 insertions(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index cfffa11abd0..b6a71f0b03d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -431,6 +431,9 @@ Release 2.2.1 - UNRELEASED HDFS-5347. Add HDFS NFS user guide. (brandonli) + HDFS-5403. WebHdfs client cannot communicate with older WebHdfs servers + post HDFS-5306. (atm) + Release 2.2.0 - 2013-10-13 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java index 9fb6dfd3d0f..db4adc52476 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java @@ -297,10 +297,15 @@ private static Map toJsonMap(final DatanodeInfo datanodeinfo) { } /** Convert a Json map to an DatanodeInfo object. */ - private static DatanodeInfo toDatanodeInfo(final Map m) { + static DatanodeInfo toDatanodeInfo(final Map m) { if (m == null) { return null; } + + Object infoSecurePort = m.get("infoSecurePort"); + if (infoSecurePort == null) { + infoSecurePort = 0l; // same as the default value in hdfs.proto + } return new DatanodeInfo( (String)m.get("ipAddr"), @@ -308,7 +313,7 @@ private static DatanodeInfo toDatanodeInfo(final Map m) { (String)m.get("storageID"), (int)(long)(Long)m.get("xferPort"), (int)(long)(Long)m.get("infoPort"), - (int)(long)(Long)m.get("infoSecurePort"), + (int)(long)(Long)infoSecurePort, (int)(long)(Long)m.get("ipcPort"), (Long)m.get("capacity"), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestJsonUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestJsonUtil.java index 5460047a2ed..0dc00a6aac2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestJsonUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestJsonUtil.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.web; +import java.util.HashMap; import java.util.Map; import org.apache.hadoop.fs.FileStatus; @@ -58,4 +59,27 @@ public void testHdfsFileStatus() { System.out.println("fs2 = " + fs2); Assert.assertEquals(fstatus, fs2); } + + @Test + public void testToDatanodeInfoWithoutSecurePort() { + Map response = new HashMap(); + + response.put("ipAddr", "127.0.0.1"); + response.put("hostName", "localhost"); + response.put("storageID", "fake-id"); + response.put("xferPort", 1337l); + response.put("infoPort", 1338l); + // deliberately don't include an entry for "infoSecurePort" + response.put("ipcPort", 1339l); + response.put("capacity", 1024l); + response.put("dfsUsed", 512l); + response.put("remaining", 512l); + response.put("blockPoolUsed", 512l); + response.put("lastUpdate", 0l); + response.put("xceiverCount", 4096l); + response.put("networkLocation", "foo.bar.baz"); + response.put("adminState", "NORMAL"); + + JsonUtil.toDatanodeInfo(response); + } } From f09c52bb7c0d248e3b5981a705ad9ccd132e8441 Mon Sep 17 00:00:00 2001 From: Jonathan Turner Eagles Date: Wed, 23 Oct 2013 18:03:08 +0000 Subject: [PATCH 3/3] HADOOP-9016. HarFsInputStream.skip(long) must never return negative value. (Ivan A. Veselovsky via jeagles) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1535086 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 3 + .../org/apache/hadoop/fs/HarFileSystem.java | 58 +- .../hadoop/tools/TestHadoopArchives.java | 575 +++++++++++++++--- 3 files changed, 552 insertions(+), 84 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 204715631af..0756bc7b791 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -411,6 +411,9 @@ Release 2.3.0 - UNRELEASED HADOOP-9981. globStatus should minimize its listStatus and getFileStatus calls. (Contributed by Colin Patrick McCabe) + HADOOP-9016. HarFsInputStream.skip(long) must never return negative value. + (Ivan A. Veselovsky via jeagles) + Release 2.2.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java index afa45246f5e..091b35a846a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java @@ -898,11 +898,15 @@ private static class HarFsInputStream extends FSInputStream private long position, start, end; //The underlying data input stream that the // underlying filesystem will return. - private FSDataInputStream underLyingStream; + private final FSDataInputStream underLyingStream; //one byte buffer - private byte[] oneBytebuff = new byte[1]; + private final byte[] oneBytebuff = new byte[1]; + HarFsInputStream(FileSystem fs, Path path, long start, long length, int bufferSize) throws IOException { + if (length < 0) { + throw new IllegalArgumentException("Negative length ["+length+"]"); + } underLyingStream = fs.open(path, bufferSize); underLyingStream.seek(start); // the start of this file in the part file @@ -916,7 +920,7 @@ private static class HarFsInputStream extends FSInputStream @Override public synchronized int available() throws IOException { long remaining = end - underLyingStream.getPos(); - if (remaining > (long)Integer.MAX_VALUE) { + if (remaining > Integer.MAX_VALUE) { return Integer.MAX_VALUE; } return (int) remaining; @@ -948,10 +952,14 @@ public synchronized int read() throws IOException { return (ret <= 0) ? -1: (oneBytebuff[0] & 0xff); } + // NB: currently this method actually never executed becusae + // java.io.DataInputStream.read(byte[]) directly delegates to + // method java.io.InputStream.read(byte[], int, int). + // However, potentially it can be invoked, so leave it intact for now. @Override public synchronized int read(byte[] b) throws IOException { - int ret = read(b, 0, b.length); - if (ret != -1) { + final int ret = read(b, 0, b.length); + if (ret > 0) { position += ret; } return ret; @@ -980,15 +988,19 @@ public synchronized int read(byte[] b, int offset, int len) public synchronized long skip(long n) throws IOException { long tmpN = n; if (tmpN > 0) { - if (position + tmpN > end) { - tmpN = end - position; - } + final long actualRemaining = end - position; + if (tmpN > actualRemaining) { + tmpN = actualRemaining; + } underLyingStream.seek(tmpN + position); position += tmpN; return tmpN; - } - return (tmpN < 0)? -1 : 0; - } + } + // NB: the contract is described in java.io.InputStream.skip(long): + // this method returns the number of bytes actually skipped, so, + // the return value should never be negative. + return 0; + } @Override public synchronized long getPos() throws IOException { @@ -996,14 +1008,23 @@ public synchronized long getPos() throws IOException { } @Override - public synchronized void seek(long pos) throws IOException { - if (pos < 0 || (start + pos > end)) { - throw new IOException("Failed to seek: EOF"); - } + public synchronized void seek(final long pos) throws IOException { + validatePosition(pos); position = start + pos; underLyingStream.seek(position); } + private void validatePosition(final long pos) throws IOException { + if (pos < 0) { + throw new IOException("Negative position: "+pos); + } + final long length = end - start; + if (pos > length) { + throw new IOException("Position behind the end " + + "of the stream (length = "+length+"): " + pos); + } + } + @Override public boolean seekToNewSource(long targetPos) throws IOException { // do not need to implement this @@ -1020,7 +1041,12 @@ public int read(long pos, byte[] b, int offset, int length) throws IOException { int nlength = length; if (start + nlength + pos > end) { - nlength = (int) (end - (start + pos)); + // length corrected to the real remaining length: + nlength = (int) (end - start - pos); + } + if (nlength <= 0) { + // EOS: + return -1; } return underLyingStream.read(pos + start , b, offset, nlength); } diff --git a/hadoop-tools/hadoop-archives/src/test/java/org/apache/hadoop/tools/TestHadoopArchives.java b/hadoop-tools/hadoop-archives/src/test/java/org/apache/hadoop/tools/TestHadoopArchives.java index b6310fd91b1..65bbbe451bf 100644 --- a/hadoop-tools/hadoop-archives/src/test/java/org/apache/hadoop/tools/TestHadoopArchives.java +++ b/hadoop-tools/hadoop-archives/src/test/java/org/apache/hadoop/tools/TestHadoopArchives.java @@ -19,6 +19,7 @@ package org.apache.hadoop.tools; import java.io.ByteArrayOutputStream; +import java.io.FilterInputStream; import java.io.IOException; import java.io.PrintStream; import java.net.URI; @@ -30,9 +31,13 @@ import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsShell; +import org.apache.hadoop.fs.HarFileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.IOUtils; @@ -42,6 +47,7 @@ import org.apache.log4j.Level; import org.junit.After; import org.junit.Assert; +import static org.junit.Assert.*; import org.junit.Before; import org.junit.Test; @@ -62,19 +68,36 @@ public class TestHadoopArchives { private static final String inputDir = "input"; private Path inputPath; + private Path archivePath; + private final List fileList = new ArrayList(); private MiniDFSCluster dfscluster; private Configuration conf; private FileSystem fs; - private Path archivePath; - static private Path createFile(Path dir, String filename, FileSystem fs) - throws IOException { - final Path f = new Path(dir, filename); + private static String createFile(Path root, FileSystem fs, String... dirsAndFile + ) throws IOException { + String fileBaseName = dirsAndFile[dirsAndFile.length - 1]; + return createFile(root, fs, fileBaseName.getBytes("UTF-8"), dirsAndFile); + } + + private static String createFile(Path root, FileSystem fs, byte[] fileContent, String... dirsAndFile + ) throws IOException { + StringBuilder sb = new StringBuilder(); + for (String segment: dirsAndFile) { + if (sb.length() > 0) { + sb.append(Path.SEPARATOR); + } + sb.append(segment); + } + final Path f = new Path(root, sb.toString()); final FSDataOutputStream out = fs.create(f); - out.write(filename.getBytes()); - out.close(); - return f; + try { + out.write(fileContent); + } finally { + out.close(); + } + return sb.toString(); } @Before @@ -86,102 +109,80 @@ public void setUp() throws Exception { conf.set(CapacitySchedulerConfiguration.PREFIX + CapacitySchedulerConfiguration.ROOT + ".default." + CapacitySchedulerConfiguration.CAPACITY, "100"); - dfscluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true) - .build(); + dfscluster = new MiniDFSCluster + .Builder(conf) + .checkExitOnShutdown(true) + .numDataNodes(2) + .format(true) + .racks(null) + .build(); fs = dfscluster.getFileSystem(); - inputPath = new Path(fs.getHomeDirectory(), inputDir); + + // prepare archive path: archivePath = new Path(fs.getHomeDirectory(), "archive"); + fs.delete(archivePath, true); + + // prepare input path: + inputPath = new Path(fs.getHomeDirectory(), inputDir); + fs.delete(inputPath, true); fs.mkdirs(inputPath); - createFile(inputPath, "a", fs); - createFile(inputPath, "b", fs); - createFile(inputPath, "c", fs); + // create basic input files: + fileList.add(createFile(inputPath, fs, "a")); + fileList.add(createFile(inputPath, fs, "b")); + fileList.add(createFile(inputPath, fs, "c")); } @After public void tearDown() throws Exception { - try { - if (dfscluster != null) { - dfscluster.shutdown(); - } - if (dfscluster != null) { - dfscluster.shutdown(); - } - } catch (Exception e) { - System.err.println(e); + if (dfscluster != null) { + dfscluster.shutdown(); } } @Test public void testRelativePath() throws Exception { - fs.delete(archivePath, true); - final Path sub1 = new Path(inputPath, "dir1"); fs.mkdirs(sub1); - createFile(sub1, "a", fs); + createFile(inputPath, fs, sub1.getName(), "a"); final FsShell shell = new FsShell(conf); final List originalPaths = lsr(shell, "input"); - System.out.println("originalPath: " + originalPaths); - final URI uri = fs.getUri(); - final String prefix = "har://hdfs-" + uri.getHost() + ":" + uri.getPort() - + archivePath.toUri().getPath() + Path.SEPARATOR; + System.out.println("originalPaths: " + originalPaths); - { - final String harName = "foo.har"; - final String[] args = { "-archiveName", harName, "-p", "input", "*", - "archive" }; - System.setProperty(HadoopArchives.TEST_HADOOP_ARCHIVES_JAR_PATH, - HADOOP_ARCHIVES_JAR); - final HadoopArchives har = new HadoopArchives(conf); - Assert.assertEquals(0, ToolRunner.run(har, args)); + // make the archive: + final String fullHarPathStr = makeArchive(); - // compare results - final List harPaths = lsr(shell, prefix + harName); - Assert.assertEquals(originalPaths, harPaths); - } + // compare results: + final List harPaths = lsr(shell, fullHarPathStr); + Assert.assertEquals(originalPaths, harPaths); } @Test public void testPathWithSpaces() throws Exception { - fs.delete(archivePath, true); - // create files/directories with spaces - createFile(inputPath, "c c", fs); + createFile(inputPath, fs, "c c"); final Path sub1 = new Path(inputPath, "sub 1"); fs.mkdirs(sub1); - createFile(sub1, "file x y z", fs); - createFile(sub1, "file", fs); - createFile(sub1, "x", fs); - createFile(sub1, "y", fs); - createFile(sub1, "z", fs); + createFile(sub1, fs, "file x y z"); + createFile(sub1, fs, "file"); + createFile(sub1, fs, "x"); + createFile(sub1, fs, "y"); + createFile(sub1, fs, "z"); final Path sub2 = new Path(inputPath, "sub 1 with suffix"); fs.mkdirs(sub2); - createFile(sub2, "z", fs); + createFile(sub2, fs, "z"); final FsShell shell = new FsShell(conf); - final String inputPathStr = inputPath.toUri().getPath(); - final List originalPaths = lsr(shell, inputPathStr); - final URI uri = fs.getUri(); - final String prefix = "har://hdfs-" + uri.getHost() + ":" + uri.getPort() - + archivePath.toUri().getPath() + Path.SEPARATOR; - {// Enable space replacement - final String harName = "foo.har"; - final String[] args = { "-archiveName", harName, "-p", inputPathStr, "*", - archivePath.toString() }; - System.setProperty(HadoopArchives.TEST_HADOOP_ARCHIVES_JAR_PATH, - HADOOP_ARCHIVES_JAR); - final HadoopArchives har = new HadoopArchives(conf); - Assert.assertEquals(0, ToolRunner.run(har, args)); - - // compare results - final List harPaths = lsr(shell, prefix + harName); - Assert.assertEquals(originalPaths, harPaths); - } + // make the archive: + final String fullHarPathStr = makeArchive(); + // compare results + final List harPaths = lsr(shell, fullHarPathStr); + Assert.assertEquals(originalPaths, harPaths); } private static List lsr(final FsShell shell, String dir) @@ -222,4 +223,442 @@ private static List lsr(final FsShell shell, String dir) .println("lsr paths = " + paths.toString().replace(", ", ",\n ")); return paths; } + + @Test + public void testReadFileContent() throws Exception { + fileList.add(createFile(inputPath, fs, "c c")); + final Path sub1 = new Path(inputPath, "sub 1"); + fs.mkdirs(sub1); + fileList.add(createFile(inputPath, fs, sub1.getName(), "file x y z")); + fileList.add(createFile(inputPath, fs, sub1.getName(), "file")); + fileList.add(createFile(inputPath, fs, sub1.getName(), "x")); + fileList.add(createFile(inputPath, fs, sub1.getName(), "y")); + fileList.add(createFile(inputPath, fs, sub1.getName(), "z")); + final Path sub2 = new Path(inputPath, "sub 1 with suffix"); + fs.mkdirs(sub2); + fileList.add(createFile(inputPath, fs, sub2.getName(), "z")); + // Generate a big binary file content: + final byte[] binContent = prepareBin(); + fileList.add(createFile(inputPath, fs, binContent, sub2.getName(), "bin")); + fileList.add(createFile(inputPath, fs, new byte[0], sub2.getName(), "zero-length")); + + final String fullHarPathStr = makeArchive(); + + // Create fresh HarFs: + final HarFileSystem harFileSystem = new HarFileSystem(fs); + try { + final URI harUri = new URI(fullHarPathStr); + harFileSystem.initialize(harUri, fs.getConf()); + // now read the file content and compare it against the expected: + int readFileCount = 0; + for (final String pathStr0 : fileList) { + final Path path = new Path(fullHarPathStr + Path.SEPARATOR + pathStr0); + final String baseName = path.getName(); + final FileStatus status = harFileSystem.getFileStatus(path); + if (status.isFile()) { + // read the file: + final byte[] actualContentSimple = readAllSimple( + harFileSystem.open(path), true); + + final byte[] actualContentBuffer = readAllWithBuffer( + harFileSystem.open(path), true); + assertArrayEquals(actualContentSimple, actualContentBuffer); + + final byte[] actualContentFully = readAllWithReadFully( + actualContentSimple.length, + harFileSystem.open(path), true); + assertArrayEquals(actualContentSimple, actualContentFully); + + final byte[] actualContentSeek = readAllWithSeek( + actualContentSimple.length, + harFileSystem.open(path), true); + assertArrayEquals(actualContentSimple, actualContentSeek); + + final byte[] actualContentRead4 + = readAllWithRead4(harFileSystem.open(path), true); + assertArrayEquals(actualContentSimple, actualContentRead4); + + final byte[] actualContentSkip = readAllWithSkip( + actualContentSimple.length, + harFileSystem.open(path), + harFileSystem.open(path), + true); + assertArrayEquals(actualContentSimple, actualContentSkip); + + if ("bin".equals(baseName)) { + assertArrayEquals(binContent, actualContentSimple); + } else if ("zero-length".equals(baseName)) { + assertEquals(0, actualContentSimple.length); + } else { + String actual = new String(actualContentSimple, "UTF-8"); + assertEquals(baseName, actual); + } + readFileCount++; + } + } + assertEquals(fileList.size(), readFileCount); + } finally { + harFileSystem.close(); + } + } + + private static byte[] readAllSimple(FSDataInputStream fsdis, boolean close) throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + int b; + while (true) { + b = fsdis.read(); + if (b < 0) { + break; + } else { + baos.write(b); + } + } + baos.close(); + return baos.toByteArray(); + } finally { + if (close) { + fsdis.close(); + } + } + } + + private static byte[] readAllWithBuffer(FSDataInputStream fsdis, boolean close) + throws IOException { + try { + final int available = fsdis.available(); + final byte[] buffer; + final ByteArrayOutputStream baos; + if (available < 0) { + buffer = new byte[1024]; + baos = new ByteArrayOutputStream(buffer.length * 2); + } else { + buffer = new byte[available]; + baos = new ByteArrayOutputStream(available); + } + int readIntoBuffer = 0; + int read; + while (true) { + read = fsdis.read(buffer, readIntoBuffer, buffer.length - readIntoBuffer); + if (read < 0) { + // end of stream: + if (readIntoBuffer > 0) { + baos.write(buffer, 0, readIntoBuffer); + } + return baos.toByteArray(); + } else { + readIntoBuffer += read; + if (readIntoBuffer == buffer.length) { + // buffer is full, need to clean the buffer. + // drop the buffered data to baos: + baos.write(buffer); + // reset the counter to start reading to the buffer beginning: + readIntoBuffer = 0; + } else if (readIntoBuffer > buffer.length) { + throw new IOException("Read more than the buffer length: " + + readIntoBuffer + ", buffer length = " + buffer.length); + } + } + } + } finally { + if (close) { + fsdis.close(); + } + } + } + + private static byte[] readAllWithReadFully(int totalLength, FSDataInputStream fsdis, boolean close) + throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + // Simulate reading of some data structures of known length: + final byte[] buffer = new byte[17]; + final int times = totalLength / buffer.length; + final int remainder = totalLength % buffer.length; + // it would be simpler to leave the position tracking to the + // InputStream, but we need to check the methods #readFully(2) + // and #readFully(4) that receive the position as a parameter: + int position = 0; + try { + // read "data structures": + for (int i=0; i 0) { + // read the remainder: + fsdis.readFully(position, buffer, 0, remainder); + position += remainder; + baos.write(buffer, 0, remainder); + } + try { + fsdis.readFully(position, buffer, 0, 1); + assertTrue(false); + } catch (IOException ioe) { + // okay + } + assertEquals(totalLength, position); + final byte[] result = baos.toByteArray(); + assertEquals(totalLength, result.length); + return result; + } finally { + if (close) { + fsdis.close(); + } + } + } + + private static byte[] readAllWithRead4(FSDataInputStream fsdis, boolean close) + throws IOException { + try { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final byte[] buffer = new byte[17]; + int totalRead = 0; + int read; + while (true) { + read = fsdis.read(totalRead, buffer, 0, buffer.length); + if (read > 0) { + totalRead += read; + baos.write(buffer, 0, read); + } else if (read < 0) { + break; // EOF + } else { + // read == 0: + // zero result may be returned *only* in case if the 4th + // parameter is 0. Since in our case this is 'buffer.length', + // zero return value clearly indicates a bug: + throw new AssertionError("FSDataInputStream#read(4) returned 0, while " + + " the 4th method parameter is " + buffer.length + "."); + } + } + final byte[] result = baos.toByteArray(); + return result; + } finally { + if (close) { + fsdis.close(); + } + } + } + + private static byte[] readAllWithSeek(final int totalLength, + final FSDataInputStream fsdis, final boolean close) + throws IOException { + final byte[] result = new byte[totalLength]; + long pos; + try { + // read the data in the reverse order, from + // the tail to the head by pieces of 'buffer' length: + final byte[] buffer = new byte[17]; + final int times = totalLength / buffer.length; + int read; + int expectedRead; + for (int i=times; i>=0; i--) { + pos = i * buffer.length; + fsdis.seek(pos); + // check that seek is successful: + assertEquals(pos, fsdis.getPos()); + read = fsdis.read(buffer); + // check we read right number of bytes: + if (i == times) { + expectedRead = totalLength % buffer.length; // remainder + if (expectedRead == 0) { + // zero remainder corresponds to the EOS, so + // by the contract of DataInpitStream#read(byte[]) -1 should be + // returned: + expectedRead = -1; + } + } else { + expectedRead = buffer.length; + } + assertEquals(expectedRead, read); + if (read > 0) { + System.arraycopy(buffer, 0, result, (int)pos, read); + } + } + + // finally, check that #seek() to not existing position leads to IOE: + expectSeekIOE(fsdis, Long.MAX_VALUE, "Seek to Long.MAX_VALUE should lead to IOE."); + expectSeekIOE(fsdis, Long.MIN_VALUE, "Seek to Long.MIN_VALUE should lead to IOE."); + long pp = -1L; + expectSeekIOE(fsdis, pp, "Seek to "+pp+" should lead to IOE."); + + // NB: is is *possible* to #seek(length), but *impossible* to #seek(length + 1): + fsdis.seek(totalLength); + assertEquals(totalLength, fsdis.getPos()); + pp = totalLength + 1; + expectSeekIOE(fsdis, pp, "Seek to the length position + 1 ("+pp+") should lead to IOE."); + + return result; + } finally { + if (close) { + fsdis.close(); + } + } + } + + private static void expectSeekIOE(FSDataInputStream fsdis, long seekPos, String message) { + try { + fsdis.seek(seekPos); + assertTrue(message + " (Position = " + fsdis.getPos() + ")", false); + } catch (IOException ioe) { + // okay + } + } + + /* + * Reads data by chunks from 2 input streams: + * reads chunk from stream 1, and skips this chunk in the stream 2; + * Then reads next chunk from stream 2, and skips this chunk in stream 1. + */ + private static byte[] readAllWithSkip( + final int totalLength, + final FSDataInputStream fsdis1, + final FSDataInputStream fsdis2, + final boolean close) + throws IOException { + // test negative skip arg: + assertEquals(0, fsdis1.skip(-1)); + // test zero skip arg: + assertEquals(0, fsdis1.skip(0)); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(totalLength); + try { + // read the data in the reverse order, from + // the tail to the head by pieces of 'buffer' length: + final byte[] buffer = new byte[17]; + final int times = totalLength / buffer.length; + final int remainder = totalLength % buffer.length; + long skipped; + long expectedPosition; + int toGo; + for (int i=0; i<=times; i++) { + toGo = (i < times) ? buffer.length : remainder; + if (i % 2 == 0) { + fsdis1.readFully(buffer, 0, toGo); + skipped = skipUntilZero(fsdis2, toGo); + } else { + fsdis2.readFully(buffer, 0, toGo); + skipped = skipUntilZero(fsdis1, toGo); + } + if (i < times) { + assertEquals(buffer.length, skipped); + expectedPosition = (i + 1) * buffer.length; + } else { + // remainder: + if (remainder > 0) { + assertEquals(remainder, skipped); + } else { + assertEquals(0, skipped); + } + expectedPosition = totalLength; + } + // check if the 2 streams have equal and correct positions: + assertEquals(expectedPosition, fsdis1.getPos()); + assertEquals(expectedPosition, fsdis2.getPos()); + // save the read data: + if (toGo > 0) { + baos.write(buffer, 0, toGo); + } + } + + // finally, check up if ended stream cannot skip: + assertEquals(0, fsdis1.skip(-1)); + assertEquals(0, fsdis1.skip(0)); + assertEquals(0, fsdis1.skip(1)); + assertEquals(0, fsdis1.skip(Long.MAX_VALUE)); + + return baos.toByteArray(); + } finally { + if (close) { + fsdis1.close(); + fsdis2.close(); + } + } + } + + private static long skipUntilZero(final FilterInputStream fis, + final long toSkip) throws IOException { + long skipped = 0; + long remainsToSkip = toSkip; + long s; + while (skipped < toSkip) { + s = fis.skip(remainsToSkip); // actually skippped + if (s == 0) { + return skipped; // EOF or impossible to skip. + } + skipped += s; + remainsToSkip -= s; + } + return skipped; + } + + private static byte[] prepareBin() { + byte[] bb = new byte[77777]; + for (int i=0; i