Merge r1534894 through r1535121 from trunk.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1535122 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
commit
b442fe92fb
|
@ -411,6 +411,9 @@ Release 2.3.0 - UNRELEASED
|
||||||
HADOOP-9981. globStatus should minimize its listStatus and getFileStatus
|
HADOOP-9981. globStatus should minimize its listStatus and getFileStatus
|
||||||
calls. (Contributed by Colin Patrick McCabe)
|
calls. (Contributed by Colin Patrick McCabe)
|
||||||
|
|
||||||
|
HADOOP-9016. HarFsInputStream.skip(long) must never return negative value.
|
||||||
|
(Ivan A. Veselovsky via jeagles)
|
||||||
|
|
||||||
Release 2.2.1 - UNRELEASED
|
Release 2.2.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -898,11 +898,15 @@ public class HarFileSystem extends FileSystem {
|
||||||
private long position, start, end;
|
private long position, start, end;
|
||||||
//The underlying data input stream that the
|
//The underlying data input stream that the
|
||||||
// underlying filesystem will return.
|
// underlying filesystem will return.
|
||||||
private FSDataInputStream underLyingStream;
|
private final FSDataInputStream underLyingStream;
|
||||||
//one byte buffer
|
//one byte buffer
|
||||||
private byte[] oneBytebuff = new byte[1];
|
private final byte[] oneBytebuff = new byte[1];
|
||||||
|
|
||||||
HarFsInputStream(FileSystem fs, Path path, long start,
|
HarFsInputStream(FileSystem fs, Path path, long start,
|
||||||
long length, int bufferSize) throws IOException {
|
long length, int bufferSize) throws IOException {
|
||||||
|
if (length < 0) {
|
||||||
|
throw new IllegalArgumentException("Negative length ["+length+"]");
|
||||||
|
}
|
||||||
underLyingStream = fs.open(path, bufferSize);
|
underLyingStream = fs.open(path, bufferSize);
|
||||||
underLyingStream.seek(start);
|
underLyingStream.seek(start);
|
||||||
// the start of this file in the part file
|
// the start of this file in the part file
|
||||||
|
@ -916,7 +920,7 @@ public class HarFileSystem extends FileSystem {
|
||||||
@Override
|
@Override
|
||||||
public synchronized int available() throws IOException {
|
public synchronized int available() throws IOException {
|
||||||
long remaining = end - underLyingStream.getPos();
|
long remaining = end - underLyingStream.getPos();
|
||||||
if (remaining > (long)Integer.MAX_VALUE) {
|
if (remaining > Integer.MAX_VALUE) {
|
||||||
return Integer.MAX_VALUE;
|
return Integer.MAX_VALUE;
|
||||||
}
|
}
|
||||||
return (int) remaining;
|
return (int) remaining;
|
||||||
|
@ -948,10 +952,14 @@ public class HarFileSystem extends FileSystem {
|
||||||
return (ret <= 0) ? -1: (oneBytebuff[0] & 0xff);
|
return (ret <= 0) ? -1: (oneBytebuff[0] & 0xff);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NB: currently this method actually never executed becusae
|
||||||
|
// java.io.DataInputStream.read(byte[]) directly delegates to
|
||||||
|
// method java.io.InputStream.read(byte[], int, int).
|
||||||
|
// However, potentially it can be invoked, so leave it intact for now.
|
||||||
@Override
|
@Override
|
||||||
public synchronized int read(byte[] b) throws IOException {
|
public synchronized int read(byte[] b) throws IOException {
|
||||||
int ret = read(b, 0, b.length);
|
final int ret = read(b, 0, b.length);
|
||||||
if (ret != -1) {
|
if (ret > 0) {
|
||||||
position += ret;
|
position += ret;
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
|
@ -980,15 +988,19 @@ public class HarFileSystem extends FileSystem {
|
||||||
public synchronized long skip(long n) throws IOException {
|
public synchronized long skip(long n) throws IOException {
|
||||||
long tmpN = n;
|
long tmpN = n;
|
||||||
if (tmpN > 0) {
|
if (tmpN > 0) {
|
||||||
if (position + tmpN > end) {
|
final long actualRemaining = end - position;
|
||||||
tmpN = end - position;
|
if (tmpN > actualRemaining) {
|
||||||
}
|
tmpN = actualRemaining;
|
||||||
|
}
|
||||||
underLyingStream.seek(tmpN + position);
|
underLyingStream.seek(tmpN + position);
|
||||||
position += tmpN;
|
position += tmpN;
|
||||||
return tmpN;
|
return tmpN;
|
||||||
}
|
}
|
||||||
return (tmpN < 0)? -1 : 0;
|
// NB: the contract is described in java.io.InputStream.skip(long):
|
||||||
}
|
// this method returns the number of bytes actually skipped, so,
|
||||||
|
// the return value should never be negative.
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized long getPos() throws IOException {
|
public synchronized long getPos() throws IOException {
|
||||||
|
@ -996,14 +1008,23 @@ public class HarFileSystem extends FileSystem {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void seek(long pos) throws IOException {
|
public synchronized void seek(final long pos) throws IOException {
|
||||||
if (pos < 0 || (start + pos > end)) {
|
validatePosition(pos);
|
||||||
throw new IOException("Failed to seek: EOF");
|
|
||||||
}
|
|
||||||
position = start + pos;
|
position = start + pos;
|
||||||
underLyingStream.seek(position);
|
underLyingStream.seek(position);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void validatePosition(final long pos) throws IOException {
|
||||||
|
if (pos < 0) {
|
||||||
|
throw new IOException("Negative position: "+pos);
|
||||||
|
}
|
||||||
|
final long length = end - start;
|
||||||
|
if (pos > length) {
|
||||||
|
throw new IOException("Position behind the end " +
|
||||||
|
"of the stream (length = "+length+"): " + pos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean seekToNewSource(long targetPos) throws IOException {
|
public boolean seekToNewSource(long targetPos) throws IOException {
|
||||||
// do not need to implement this
|
// do not need to implement this
|
||||||
|
@ -1020,7 +1041,12 @@ public class HarFileSystem extends FileSystem {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
int nlength = length;
|
int nlength = length;
|
||||||
if (start + nlength + pos > end) {
|
if (start + nlength + pos > end) {
|
||||||
nlength = (int) (end - (start + pos));
|
// length corrected to the real remaining length:
|
||||||
|
nlength = (int) (end - start - pos);
|
||||||
|
}
|
||||||
|
if (nlength <= 0) {
|
||||||
|
// EOS:
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
return underLyingStream.read(pos + start , b, offset, nlength);
|
return underLyingStream.read(pos + start , b, offset, nlength);
|
||||||
}
|
}
|
||||||
|
|
|
@ -431,6 +431,9 @@ Release 2.2.1 - UNRELEASED
|
||||||
|
|
||||||
HDFS-5347. Add HDFS NFS user guide. (brandonli)
|
HDFS-5347. Add HDFS NFS user guide. (brandonli)
|
||||||
|
|
||||||
|
HDFS-5403. WebHdfs client cannot communicate with older WebHdfs servers
|
||||||
|
post HDFS-5306. (atm)
|
||||||
|
|
||||||
Release 2.2.0 - 2013-10-13
|
Release 2.2.0 - 2013-10-13
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -298,10 +298,15 @@ public class JsonUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Convert a Json map to an DatanodeInfo object. */
|
/** Convert a Json map to an DatanodeInfo object. */
|
||||||
private static DatanodeInfo toDatanodeInfo(final Map<?, ?> m) {
|
static DatanodeInfo toDatanodeInfo(final Map<?, ?> m) {
|
||||||
if (m == null) {
|
if (m == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Object infoSecurePort = m.get("infoSecurePort");
|
||||||
|
if (infoSecurePort == null) {
|
||||||
|
infoSecurePort = 0l; // same as the default value in hdfs.proto
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: Fix storageID
|
// TODO: Fix storageID
|
||||||
return new DatanodeInfo(
|
return new DatanodeInfo(
|
||||||
|
@ -310,7 +315,7 @@ public class JsonUtil {
|
||||||
(String)m.get("storageID"),
|
(String)m.get("storageID"),
|
||||||
(int)(long)(Long)m.get("xferPort"),
|
(int)(long)(Long)m.get("xferPort"),
|
||||||
(int)(long)(Long)m.get("infoPort"),
|
(int)(long)(Long)m.get("infoPort"),
|
||||||
(int)(long)(Long)m.get("infoSecurePort"),
|
(int)(long)(Long)infoSecurePort,
|
||||||
(int)(long)(Long)m.get("ipcPort"),
|
(int)(long)(Long)m.get("ipcPort"),
|
||||||
|
|
||||||
(Long)m.get("capacity"),
|
(Long)m.get("capacity"),
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.web;
|
package org.apache.hadoop.hdfs.web;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
@ -58,4 +59,27 @@ public class TestJsonUtil {
|
||||||
System.out.println("fs2 = " + fs2);
|
System.out.println("fs2 = " + fs2);
|
||||||
Assert.assertEquals(fstatus, fs2);
|
Assert.assertEquals(fstatus, fs2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testToDatanodeInfoWithoutSecurePort() {
|
||||||
|
Map<String, Object> response = new HashMap<String, Object>();
|
||||||
|
|
||||||
|
response.put("ipAddr", "127.0.0.1");
|
||||||
|
response.put("hostName", "localhost");
|
||||||
|
response.put("storageID", "fake-id");
|
||||||
|
response.put("xferPort", 1337l);
|
||||||
|
response.put("infoPort", 1338l);
|
||||||
|
// deliberately don't include an entry for "infoSecurePort"
|
||||||
|
response.put("ipcPort", 1339l);
|
||||||
|
response.put("capacity", 1024l);
|
||||||
|
response.put("dfsUsed", 512l);
|
||||||
|
response.put("remaining", 512l);
|
||||||
|
response.put("blockPoolUsed", 512l);
|
||||||
|
response.put("lastUpdate", 0l);
|
||||||
|
response.put("xceiverCount", 4096l);
|
||||||
|
response.put("networkLocation", "foo.bar.baz");
|
||||||
|
response.put("adminState", "NORMAL");
|
||||||
|
|
||||||
|
JsonUtil.toDatanodeInfo(response);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.tools;
|
package org.apache.hadoop.tools;
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.FilterInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
@ -30,9 +31,13 @@ import java.util.StringTokenizer;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.commons.logging.impl.Log4JLogger;
|
import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FsShell;
|
import org.apache.hadoop.fs.FsShell;
|
||||||
|
import org.apache.hadoop.fs.HarFileSystem;
|
||||||
|
import org.apache.hadoop.fs.LocalFileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
@ -42,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
import static org.junit.Assert.*;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -62,19 +68,36 @@ public class TestHadoopArchives {
|
||||||
private static final String inputDir = "input";
|
private static final String inputDir = "input";
|
||||||
|
|
||||||
private Path inputPath;
|
private Path inputPath;
|
||||||
|
private Path archivePath;
|
||||||
|
private final List<String> fileList = new ArrayList<String>();
|
||||||
private MiniDFSCluster dfscluster;
|
private MiniDFSCluster dfscluster;
|
||||||
|
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private FileSystem fs;
|
private FileSystem fs;
|
||||||
private Path archivePath;
|
|
||||||
|
|
||||||
static private Path createFile(Path dir, String filename, FileSystem fs)
|
private static String createFile(Path root, FileSystem fs, String... dirsAndFile
|
||||||
throws IOException {
|
) throws IOException {
|
||||||
final Path f = new Path(dir, filename);
|
String fileBaseName = dirsAndFile[dirsAndFile.length - 1];
|
||||||
|
return createFile(root, fs, fileBaseName.getBytes("UTF-8"), dirsAndFile);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String createFile(Path root, FileSystem fs, byte[] fileContent, String... dirsAndFile
|
||||||
|
) throws IOException {
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
for (String segment: dirsAndFile) {
|
||||||
|
if (sb.length() > 0) {
|
||||||
|
sb.append(Path.SEPARATOR);
|
||||||
|
}
|
||||||
|
sb.append(segment);
|
||||||
|
}
|
||||||
|
final Path f = new Path(root, sb.toString());
|
||||||
final FSDataOutputStream out = fs.create(f);
|
final FSDataOutputStream out = fs.create(f);
|
||||||
out.write(filename.getBytes());
|
try {
|
||||||
out.close();
|
out.write(fileContent);
|
||||||
return f;
|
} finally {
|
||||||
|
out.close();
|
||||||
|
}
|
||||||
|
return sb.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
@ -86,102 +109,80 @@ public class TestHadoopArchives {
|
||||||
conf.set(CapacitySchedulerConfiguration.PREFIX
|
conf.set(CapacitySchedulerConfiguration.PREFIX
|
||||||
+ CapacitySchedulerConfiguration.ROOT + ".default."
|
+ CapacitySchedulerConfiguration.ROOT + ".default."
|
||||||
+ CapacitySchedulerConfiguration.CAPACITY, "100");
|
+ CapacitySchedulerConfiguration.CAPACITY, "100");
|
||||||
dfscluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true)
|
dfscluster = new MiniDFSCluster
|
||||||
.build();
|
.Builder(conf)
|
||||||
|
.checkExitOnShutdown(true)
|
||||||
|
.numDataNodes(2)
|
||||||
|
.format(true)
|
||||||
|
.racks(null)
|
||||||
|
.build();
|
||||||
|
|
||||||
fs = dfscluster.getFileSystem();
|
fs = dfscluster.getFileSystem();
|
||||||
inputPath = new Path(fs.getHomeDirectory(), inputDir);
|
|
||||||
|
// prepare archive path:
|
||||||
archivePath = new Path(fs.getHomeDirectory(), "archive");
|
archivePath = new Path(fs.getHomeDirectory(), "archive");
|
||||||
|
fs.delete(archivePath, true);
|
||||||
|
|
||||||
|
// prepare input path:
|
||||||
|
inputPath = new Path(fs.getHomeDirectory(), inputDir);
|
||||||
|
fs.delete(inputPath, true);
|
||||||
fs.mkdirs(inputPath);
|
fs.mkdirs(inputPath);
|
||||||
createFile(inputPath, "a", fs);
|
// create basic input files:
|
||||||
createFile(inputPath, "b", fs);
|
fileList.add(createFile(inputPath, fs, "a"));
|
||||||
createFile(inputPath, "c", fs);
|
fileList.add(createFile(inputPath, fs, "b"));
|
||||||
|
fileList.add(createFile(inputPath, fs, "c"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
try {
|
if (dfscluster != null) {
|
||||||
if (dfscluster != null) {
|
dfscluster.shutdown();
|
||||||
dfscluster.shutdown();
|
|
||||||
}
|
|
||||||
if (dfscluster != null) {
|
|
||||||
dfscluster.shutdown();
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
System.err.println(e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRelativePath() throws Exception {
|
public void testRelativePath() throws Exception {
|
||||||
fs.delete(archivePath, true);
|
|
||||||
|
|
||||||
final Path sub1 = new Path(inputPath, "dir1");
|
final Path sub1 = new Path(inputPath, "dir1");
|
||||||
fs.mkdirs(sub1);
|
fs.mkdirs(sub1);
|
||||||
createFile(sub1, "a", fs);
|
createFile(inputPath, fs, sub1.getName(), "a");
|
||||||
final FsShell shell = new FsShell(conf);
|
final FsShell shell = new FsShell(conf);
|
||||||
|
|
||||||
final List<String> originalPaths = lsr(shell, "input");
|
final List<String> originalPaths = lsr(shell, "input");
|
||||||
System.out.println("originalPath: " + originalPaths);
|
System.out.println("originalPaths: " + originalPaths);
|
||||||
final URI uri = fs.getUri();
|
|
||||||
final String prefix = "har://hdfs-" + uri.getHost() + ":" + uri.getPort()
|
|
||||||
+ archivePath.toUri().getPath() + Path.SEPARATOR;
|
|
||||||
|
|
||||||
{
|
// make the archive:
|
||||||
final String harName = "foo.har";
|
final String fullHarPathStr = makeArchive();
|
||||||
final String[] args = { "-archiveName", harName, "-p", "input", "*",
|
|
||||||
"archive" };
|
|
||||||
System.setProperty(HadoopArchives.TEST_HADOOP_ARCHIVES_JAR_PATH,
|
|
||||||
HADOOP_ARCHIVES_JAR);
|
|
||||||
final HadoopArchives har = new HadoopArchives(conf);
|
|
||||||
Assert.assertEquals(0, ToolRunner.run(har, args));
|
|
||||||
|
|
||||||
// compare results
|
// compare results:
|
||||||
final List<String> harPaths = lsr(shell, prefix + harName);
|
final List<String> harPaths = lsr(shell, fullHarPathStr);
|
||||||
Assert.assertEquals(originalPaths, harPaths);
|
Assert.assertEquals(originalPaths, harPaths);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPathWithSpaces() throws Exception {
|
public void testPathWithSpaces() throws Exception {
|
||||||
fs.delete(archivePath, true);
|
|
||||||
|
|
||||||
// create files/directories with spaces
|
// create files/directories with spaces
|
||||||
createFile(inputPath, "c c", fs);
|
createFile(inputPath, fs, "c c");
|
||||||
final Path sub1 = new Path(inputPath, "sub 1");
|
final Path sub1 = new Path(inputPath, "sub 1");
|
||||||
fs.mkdirs(sub1);
|
fs.mkdirs(sub1);
|
||||||
createFile(sub1, "file x y z", fs);
|
createFile(sub1, fs, "file x y z");
|
||||||
createFile(sub1, "file", fs);
|
createFile(sub1, fs, "file");
|
||||||
createFile(sub1, "x", fs);
|
createFile(sub1, fs, "x");
|
||||||
createFile(sub1, "y", fs);
|
createFile(sub1, fs, "y");
|
||||||
createFile(sub1, "z", fs);
|
createFile(sub1, fs, "z");
|
||||||
final Path sub2 = new Path(inputPath, "sub 1 with suffix");
|
final Path sub2 = new Path(inputPath, "sub 1 with suffix");
|
||||||
fs.mkdirs(sub2);
|
fs.mkdirs(sub2);
|
||||||
createFile(sub2, "z", fs);
|
createFile(sub2, fs, "z");
|
||||||
|
|
||||||
final FsShell shell = new FsShell(conf);
|
final FsShell shell = new FsShell(conf);
|
||||||
|
|
||||||
final String inputPathStr = inputPath.toUri().getPath();
|
final String inputPathStr = inputPath.toUri().getPath();
|
||||||
|
|
||||||
final List<String> originalPaths = lsr(shell, inputPathStr);
|
final List<String> originalPaths = lsr(shell, inputPathStr);
|
||||||
final URI uri = fs.getUri();
|
|
||||||
final String prefix = "har://hdfs-" + uri.getHost() + ":" + uri.getPort()
|
|
||||||
+ archivePath.toUri().getPath() + Path.SEPARATOR;
|
|
||||||
|
|
||||||
{// Enable space replacement
|
// make the archive:
|
||||||
final String harName = "foo.har";
|
final String fullHarPathStr = makeArchive();
|
||||||
final String[] args = { "-archiveName", harName, "-p", inputPathStr, "*",
|
|
||||||
archivePath.toString() };
|
|
||||||
System.setProperty(HadoopArchives.TEST_HADOOP_ARCHIVES_JAR_PATH,
|
|
||||||
HADOOP_ARCHIVES_JAR);
|
|
||||||
final HadoopArchives har = new HadoopArchives(conf);
|
|
||||||
Assert.assertEquals(0, ToolRunner.run(har, args));
|
|
||||||
|
|
||||||
// compare results
|
|
||||||
final List<String> harPaths = lsr(shell, prefix + harName);
|
|
||||||
Assert.assertEquals(originalPaths, harPaths);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// compare results
|
||||||
|
final List<String> harPaths = lsr(shell, fullHarPathStr);
|
||||||
|
Assert.assertEquals(originalPaths, harPaths);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<String> lsr(final FsShell shell, String dir)
|
private static List<String> lsr(final FsShell shell, String dir)
|
||||||
|
@ -222,4 +223,442 @@ public class TestHadoopArchives {
|
||||||
.println("lsr paths = " + paths.toString().replace(", ", ",\n "));
|
.println("lsr paths = " + paths.toString().replace(", ", ",\n "));
|
||||||
return paths;
|
return paths;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReadFileContent() throws Exception {
|
||||||
|
fileList.add(createFile(inputPath, fs, "c c"));
|
||||||
|
final Path sub1 = new Path(inputPath, "sub 1");
|
||||||
|
fs.mkdirs(sub1);
|
||||||
|
fileList.add(createFile(inputPath, fs, sub1.getName(), "file x y z"));
|
||||||
|
fileList.add(createFile(inputPath, fs, sub1.getName(), "file"));
|
||||||
|
fileList.add(createFile(inputPath, fs, sub1.getName(), "x"));
|
||||||
|
fileList.add(createFile(inputPath, fs, sub1.getName(), "y"));
|
||||||
|
fileList.add(createFile(inputPath, fs, sub1.getName(), "z"));
|
||||||
|
final Path sub2 = new Path(inputPath, "sub 1 with suffix");
|
||||||
|
fs.mkdirs(sub2);
|
||||||
|
fileList.add(createFile(inputPath, fs, sub2.getName(), "z"));
|
||||||
|
// Generate a big binary file content:
|
||||||
|
final byte[] binContent = prepareBin();
|
||||||
|
fileList.add(createFile(inputPath, fs, binContent, sub2.getName(), "bin"));
|
||||||
|
fileList.add(createFile(inputPath, fs, new byte[0], sub2.getName(), "zero-length"));
|
||||||
|
|
||||||
|
final String fullHarPathStr = makeArchive();
|
||||||
|
|
||||||
|
// Create fresh HarFs:
|
||||||
|
final HarFileSystem harFileSystem = new HarFileSystem(fs);
|
||||||
|
try {
|
||||||
|
final URI harUri = new URI(fullHarPathStr);
|
||||||
|
harFileSystem.initialize(harUri, fs.getConf());
|
||||||
|
// now read the file content and compare it against the expected:
|
||||||
|
int readFileCount = 0;
|
||||||
|
for (final String pathStr0 : fileList) {
|
||||||
|
final Path path = new Path(fullHarPathStr + Path.SEPARATOR + pathStr0);
|
||||||
|
final String baseName = path.getName();
|
||||||
|
final FileStatus status = harFileSystem.getFileStatus(path);
|
||||||
|
if (status.isFile()) {
|
||||||
|
// read the file:
|
||||||
|
final byte[] actualContentSimple = readAllSimple(
|
||||||
|
harFileSystem.open(path), true);
|
||||||
|
|
||||||
|
final byte[] actualContentBuffer = readAllWithBuffer(
|
||||||
|
harFileSystem.open(path), true);
|
||||||
|
assertArrayEquals(actualContentSimple, actualContentBuffer);
|
||||||
|
|
||||||
|
final byte[] actualContentFully = readAllWithReadFully(
|
||||||
|
actualContentSimple.length,
|
||||||
|
harFileSystem.open(path), true);
|
||||||
|
assertArrayEquals(actualContentSimple, actualContentFully);
|
||||||
|
|
||||||
|
final byte[] actualContentSeek = readAllWithSeek(
|
||||||
|
actualContentSimple.length,
|
||||||
|
harFileSystem.open(path), true);
|
||||||
|
assertArrayEquals(actualContentSimple, actualContentSeek);
|
||||||
|
|
||||||
|
final byte[] actualContentRead4
|
||||||
|
= readAllWithRead4(harFileSystem.open(path), true);
|
||||||
|
assertArrayEquals(actualContentSimple, actualContentRead4);
|
||||||
|
|
||||||
|
final byte[] actualContentSkip = readAllWithSkip(
|
||||||
|
actualContentSimple.length,
|
||||||
|
harFileSystem.open(path),
|
||||||
|
harFileSystem.open(path),
|
||||||
|
true);
|
||||||
|
assertArrayEquals(actualContentSimple, actualContentSkip);
|
||||||
|
|
||||||
|
if ("bin".equals(baseName)) {
|
||||||
|
assertArrayEquals(binContent, actualContentSimple);
|
||||||
|
} else if ("zero-length".equals(baseName)) {
|
||||||
|
assertEquals(0, actualContentSimple.length);
|
||||||
|
} else {
|
||||||
|
String actual = new String(actualContentSimple, "UTF-8");
|
||||||
|
assertEquals(baseName, actual);
|
||||||
|
}
|
||||||
|
readFileCount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertEquals(fileList.size(), readFileCount);
|
||||||
|
} finally {
|
||||||
|
harFileSystem.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte[] readAllSimple(FSDataInputStream fsdis, boolean close) throws IOException {
|
||||||
|
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
|
try {
|
||||||
|
int b;
|
||||||
|
while (true) {
|
||||||
|
b = fsdis.read();
|
||||||
|
if (b < 0) {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
baos.write(b);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
baos.close();
|
||||||
|
return baos.toByteArray();
|
||||||
|
} finally {
|
||||||
|
if (close) {
|
||||||
|
fsdis.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte[] readAllWithBuffer(FSDataInputStream fsdis, boolean close)
|
||||||
|
throws IOException {
|
||||||
|
try {
|
||||||
|
final int available = fsdis.available();
|
||||||
|
final byte[] buffer;
|
||||||
|
final ByteArrayOutputStream baos;
|
||||||
|
if (available < 0) {
|
||||||
|
buffer = new byte[1024];
|
||||||
|
baos = new ByteArrayOutputStream(buffer.length * 2);
|
||||||
|
} else {
|
||||||
|
buffer = new byte[available];
|
||||||
|
baos = new ByteArrayOutputStream(available);
|
||||||
|
}
|
||||||
|
int readIntoBuffer = 0;
|
||||||
|
int read;
|
||||||
|
while (true) {
|
||||||
|
read = fsdis.read(buffer, readIntoBuffer, buffer.length - readIntoBuffer);
|
||||||
|
if (read < 0) {
|
||||||
|
// end of stream:
|
||||||
|
if (readIntoBuffer > 0) {
|
||||||
|
baos.write(buffer, 0, readIntoBuffer);
|
||||||
|
}
|
||||||
|
return baos.toByteArray();
|
||||||
|
} else {
|
||||||
|
readIntoBuffer += read;
|
||||||
|
if (readIntoBuffer == buffer.length) {
|
||||||
|
// buffer is full, need to clean the buffer.
|
||||||
|
// drop the buffered data to baos:
|
||||||
|
baos.write(buffer);
|
||||||
|
// reset the counter to start reading to the buffer beginning:
|
||||||
|
readIntoBuffer = 0;
|
||||||
|
} else if (readIntoBuffer > buffer.length) {
|
||||||
|
throw new IOException("Read more than the buffer length: "
|
||||||
|
+ readIntoBuffer + ", buffer length = " + buffer.length);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (close) {
|
||||||
|
fsdis.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte[] readAllWithReadFully(int totalLength, FSDataInputStream fsdis, boolean close)
|
||||||
|
throws IOException {
|
||||||
|
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
|
// Simulate reading of some data structures of known length:
|
||||||
|
final byte[] buffer = new byte[17];
|
||||||
|
final int times = totalLength / buffer.length;
|
||||||
|
final int remainder = totalLength % buffer.length;
|
||||||
|
// it would be simpler to leave the position tracking to the
|
||||||
|
// InputStream, but we need to check the methods #readFully(2)
|
||||||
|
// and #readFully(4) that receive the position as a parameter:
|
||||||
|
int position = 0;
|
||||||
|
try {
|
||||||
|
// read "data structures":
|
||||||
|
for (int i=0; i<times; i++) {
|
||||||
|
fsdis.readFully(position, buffer);
|
||||||
|
position += buffer.length;
|
||||||
|
baos.write(buffer);
|
||||||
|
}
|
||||||
|
if (remainder > 0) {
|
||||||
|
// read the remainder:
|
||||||
|
fsdis.readFully(position, buffer, 0, remainder);
|
||||||
|
position += remainder;
|
||||||
|
baos.write(buffer, 0, remainder);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
fsdis.readFully(position, buffer, 0, 1);
|
||||||
|
assertTrue(false);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
// okay
|
||||||
|
}
|
||||||
|
assertEquals(totalLength, position);
|
||||||
|
final byte[] result = baos.toByteArray();
|
||||||
|
assertEquals(totalLength, result.length);
|
||||||
|
return result;
|
||||||
|
} finally {
|
||||||
|
if (close) {
|
||||||
|
fsdis.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte[] readAllWithRead4(FSDataInputStream fsdis, boolean close)
|
||||||
|
throws IOException {
|
||||||
|
try {
|
||||||
|
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
|
final byte[] buffer = new byte[17];
|
||||||
|
int totalRead = 0;
|
||||||
|
int read;
|
||||||
|
while (true) {
|
||||||
|
read = fsdis.read(totalRead, buffer, 0, buffer.length);
|
||||||
|
if (read > 0) {
|
||||||
|
totalRead += read;
|
||||||
|
baos.write(buffer, 0, read);
|
||||||
|
} else if (read < 0) {
|
||||||
|
break; // EOF
|
||||||
|
} else {
|
||||||
|
// read == 0:
|
||||||
|
// zero result may be returned *only* in case if the 4th
|
||||||
|
// parameter is 0. Since in our case this is 'buffer.length',
|
||||||
|
// zero return value clearly indicates a bug:
|
||||||
|
throw new AssertionError("FSDataInputStream#read(4) returned 0, while " +
|
||||||
|
" the 4th method parameter is " + buffer.length + ".");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
final byte[] result = baos.toByteArray();
|
||||||
|
return result;
|
||||||
|
} finally {
|
||||||
|
if (close) {
|
||||||
|
fsdis.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte[] readAllWithSeek(final int totalLength,
|
||||||
|
final FSDataInputStream fsdis, final boolean close)
|
||||||
|
throws IOException {
|
||||||
|
final byte[] result = new byte[totalLength];
|
||||||
|
long pos;
|
||||||
|
try {
|
||||||
|
// read the data in the reverse order, from
|
||||||
|
// the tail to the head by pieces of 'buffer' length:
|
||||||
|
final byte[] buffer = new byte[17];
|
||||||
|
final int times = totalLength / buffer.length;
|
||||||
|
int read;
|
||||||
|
int expectedRead;
|
||||||
|
for (int i=times; i>=0; i--) {
|
||||||
|
pos = i * buffer.length;
|
||||||
|
fsdis.seek(pos);
|
||||||
|
// check that seek is successful:
|
||||||
|
assertEquals(pos, fsdis.getPos());
|
||||||
|
read = fsdis.read(buffer);
|
||||||
|
// check we read right number of bytes:
|
||||||
|
if (i == times) {
|
||||||
|
expectedRead = totalLength % buffer.length; // remainder
|
||||||
|
if (expectedRead == 0) {
|
||||||
|
// zero remainder corresponds to the EOS, so
|
||||||
|
// by the contract of DataInpitStream#read(byte[]) -1 should be
|
||||||
|
// returned:
|
||||||
|
expectedRead = -1;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
expectedRead = buffer.length;
|
||||||
|
}
|
||||||
|
assertEquals(expectedRead, read);
|
||||||
|
if (read > 0) {
|
||||||
|
System.arraycopy(buffer, 0, result, (int)pos, read);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// finally, check that #seek() to not existing position leads to IOE:
|
||||||
|
expectSeekIOE(fsdis, Long.MAX_VALUE, "Seek to Long.MAX_VALUE should lead to IOE.");
|
||||||
|
expectSeekIOE(fsdis, Long.MIN_VALUE, "Seek to Long.MIN_VALUE should lead to IOE.");
|
||||||
|
long pp = -1L;
|
||||||
|
expectSeekIOE(fsdis, pp, "Seek to "+pp+" should lead to IOE.");
|
||||||
|
|
||||||
|
// NB: is is *possible* to #seek(length), but *impossible* to #seek(length + 1):
|
||||||
|
fsdis.seek(totalLength);
|
||||||
|
assertEquals(totalLength, fsdis.getPos());
|
||||||
|
pp = totalLength + 1;
|
||||||
|
expectSeekIOE(fsdis, pp, "Seek to the length position + 1 ("+pp+") should lead to IOE.");
|
||||||
|
|
||||||
|
return result;
|
||||||
|
} finally {
|
||||||
|
if (close) {
|
||||||
|
fsdis.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void expectSeekIOE(FSDataInputStream fsdis, long seekPos, String message) {
|
||||||
|
try {
|
||||||
|
fsdis.seek(seekPos);
|
||||||
|
assertTrue(message + " (Position = " + fsdis.getPos() + ")", false);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
// okay
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Reads data by chunks from 2 input streams:
|
||||||
|
* reads chunk from stream 1, and skips this chunk in the stream 2;
|
||||||
|
* Then reads next chunk from stream 2, and skips this chunk in stream 1.
|
||||||
|
*/
|
||||||
|
private static byte[] readAllWithSkip(
|
||||||
|
final int totalLength,
|
||||||
|
final FSDataInputStream fsdis1,
|
||||||
|
final FSDataInputStream fsdis2,
|
||||||
|
final boolean close)
|
||||||
|
throws IOException {
|
||||||
|
// test negative skip arg:
|
||||||
|
assertEquals(0, fsdis1.skip(-1));
|
||||||
|
// test zero skip arg:
|
||||||
|
assertEquals(0, fsdis1.skip(0));
|
||||||
|
|
||||||
|
final ByteArrayOutputStream baos = new ByteArrayOutputStream(totalLength);
|
||||||
|
try {
|
||||||
|
// read the data in the reverse order, from
|
||||||
|
// the tail to the head by pieces of 'buffer' length:
|
||||||
|
final byte[] buffer = new byte[17];
|
||||||
|
final int times = totalLength / buffer.length;
|
||||||
|
final int remainder = totalLength % buffer.length;
|
||||||
|
long skipped;
|
||||||
|
long expectedPosition;
|
||||||
|
int toGo;
|
||||||
|
for (int i=0; i<=times; i++) {
|
||||||
|
toGo = (i < times) ? buffer.length : remainder;
|
||||||
|
if (i % 2 == 0) {
|
||||||
|
fsdis1.readFully(buffer, 0, toGo);
|
||||||
|
skipped = skipUntilZero(fsdis2, toGo);
|
||||||
|
} else {
|
||||||
|
fsdis2.readFully(buffer, 0, toGo);
|
||||||
|
skipped = skipUntilZero(fsdis1, toGo);
|
||||||
|
}
|
||||||
|
if (i < times) {
|
||||||
|
assertEquals(buffer.length, skipped);
|
||||||
|
expectedPosition = (i + 1) * buffer.length;
|
||||||
|
} else {
|
||||||
|
// remainder:
|
||||||
|
if (remainder > 0) {
|
||||||
|
assertEquals(remainder, skipped);
|
||||||
|
} else {
|
||||||
|
assertEquals(0, skipped);
|
||||||
|
}
|
||||||
|
expectedPosition = totalLength;
|
||||||
|
}
|
||||||
|
// check if the 2 streams have equal and correct positions:
|
||||||
|
assertEquals(expectedPosition, fsdis1.getPos());
|
||||||
|
assertEquals(expectedPosition, fsdis2.getPos());
|
||||||
|
// save the read data:
|
||||||
|
if (toGo > 0) {
|
||||||
|
baos.write(buffer, 0, toGo);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// finally, check up if ended stream cannot skip:
|
||||||
|
assertEquals(0, fsdis1.skip(-1));
|
||||||
|
assertEquals(0, fsdis1.skip(0));
|
||||||
|
assertEquals(0, fsdis1.skip(1));
|
||||||
|
assertEquals(0, fsdis1.skip(Long.MAX_VALUE));
|
||||||
|
|
||||||
|
return baos.toByteArray();
|
||||||
|
} finally {
|
||||||
|
if (close) {
|
||||||
|
fsdis1.close();
|
||||||
|
fsdis2.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static long skipUntilZero(final FilterInputStream fis,
|
||||||
|
final long toSkip) throws IOException {
|
||||||
|
long skipped = 0;
|
||||||
|
long remainsToSkip = toSkip;
|
||||||
|
long s;
|
||||||
|
while (skipped < toSkip) {
|
||||||
|
s = fis.skip(remainsToSkip); // actually skippped
|
||||||
|
if (s == 0) {
|
||||||
|
return skipped; // EOF or impossible to skip.
|
||||||
|
}
|
||||||
|
skipped += s;
|
||||||
|
remainsToSkip -= s;
|
||||||
|
}
|
||||||
|
return skipped;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte[] prepareBin() {
|
||||||
|
byte[] bb = new byte[77777];
|
||||||
|
for (int i=0; i<bb.length; i++) {
|
||||||
|
// Generate unique values, as possible:
|
||||||
|
double d = Math.log(i + 2);
|
||||||
|
long bits = Double.doubleToLongBits(d);
|
||||||
|
bb[i] = (byte)bits;
|
||||||
|
}
|
||||||
|
return bb;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Run the HadoopArchives tool to create an archive on the
|
||||||
|
* given file system.
|
||||||
|
*/
|
||||||
|
private String makeArchive() throws Exception {
|
||||||
|
final String inputPathStr = inputPath.toUri().getPath();
|
||||||
|
System.out.println("inputPathStr = " + inputPathStr);
|
||||||
|
|
||||||
|
final URI uri = fs.getUri();
|
||||||
|
final String prefix = "har://hdfs-" + uri.getHost() + ":" + uri.getPort()
|
||||||
|
+ archivePath.toUri().getPath() + Path.SEPARATOR;
|
||||||
|
|
||||||
|
final String harName = "foo.har";
|
||||||
|
final String fullHarPathStr = prefix + harName;
|
||||||
|
final String[] args = { "-archiveName", harName, "-p", inputPathStr, "*",
|
||||||
|
archivePath.toString() };
|
||||||
|
System.setProperty(HadoopArchives.TEST_HADOOP_ARCHIVES_JAR_PATH,
|
||||||
|
HADOOP_ARCHIVES_JAR);
|
||||||
|
final HadoopArchives har = new HadoopArchives(conf);
|
||||||
|
assertEquals(0, ToolRunner.run(har, args));
|
||||||
|
return fullHarPathStr;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
/*
|
||||||
|
* Tests copying from archive file system to a local file system
|
||||||
|
*/
|
||||||
|
public void testCopyToLocal() throws Exception {
|
||||||
|
final String fullHarPathStr = makeArchive();
|
||||||
|
|
||||||
|
// make path to copy the file to:
|
||||||
|
final String tmpDir
|
||||||
|
= System.getProperty("test.build.data","build/test/data") + "/work-dir/har-fs-tmp";
|
||||||
|
final Path tmpPath = new Path(tmpDir);
|
||||||
|
final LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
|
||||||
|
localFs.delete(tmpPath, true);
|
||||||
|
localFs.mkdirs(tmpPath);
|
||||||
|
assertTrue(localFs.exists(tmpPath));
|
||||||
|
|
||||||
|
// Create fresh HarFs:
|
||||||
|
final HarFileSystem harFileSystem = new HarFileSystem(fs);
|
||||||
|
try {
|
||||||
|
final URI harUri = new URI(fullHarPathStr);
|
||||||
|
harFileSystem.initialize(harUri, fs.getConf());
|
||||||
|
|
||||||
|
final Path sourcePath = new Path(fullHarPathStr + Path.SEPARATOR + "a");
|
||||||
|
final Path targetPath = new Path(tmpPath, "straus");
|
||||||
|
// copy the Har file to a local file system:
|
||||||
|
harFileSystem.copyToLocalFile(false, sourcePath, targetPath);
|
||||||
|
FileStatus straus = localFs.getFileStatus(targetPath);
|
||||||
|
// the file should contain just 1 character:
|
||||||
|
assertEquals(1, straus.getLen());
|
||||||
|
} finally {
|
||||||
|
harFileSystem.close();
|
||||||
|
localFs.delete(tmpPath, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -64,6 +64,9 @@ Release 2.3.0 - UNRELEASED
|
||||||
YARN-1182. MiniYARNCluster creates and inits the RM/NM only on start()
|
YARN-1182. MiniYARNCluster creates and inits the RM/NM only on start()
|
||||||
(Karthik Kambatla via Sandy Ryza)
|
(Karthik Kambatla via Sandy Ryza)
|
||||||
|
|
||||||
|
HADOOP-9598. Improve code coverage of RMAdminCLI (Aleksey Gorshkov and
|
||||||
|
Andrey Klochkov via jeagles)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -162,7 +162,7 @@ public class RMAdminCLI extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private ResourceManagerAdministrationProtocol createAdminProtocol() throws IOException {
|
protected ResourceManagerAdministrationProtocol createAdminProtocol() throws IOException {
|
||||||
// Get the current configuration
|
// Get the current configuration
|
||||||
final YarnConfiguration conf = new YarnConfiguration(getConf());
|
final YarnConfiguration conf = new YarnConfiguration(getConf());
|
||||||
return ClientRMProxy.createRMProxy(conf, ResourceManagerAdministrationProtocol.class);
|
return ClientRMProxy.createRMProxy(conf, ResourceManagerAdministrationProtocol.class);
|
||||||
|
|
|
@ -0,0 +1,243 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.client;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.argThat;
|
||||||
|
import static org.mockito.Matchers.eq;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.PrintStream;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.client.cli.RMAdminCLI;
|
||||||
|
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.ArgumentMatcher;
|
||||||
|
|
||||||
|
public class TestRMAdminCLI {
|
||||||
|
|
||||||
|
private ResourceManagerAdministrationProtocol admin;
|
||||||
|
private RMAdminCLI rmAdminCLI;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void configure() {
|
||||||
|
admin = mock(ResourceManagerAdministrationProtocol.class);
|
||||||
|
rmAdminCLI = new RMAdminCLI() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ResourceManagerAdministrationProtocol createAdminProtocol()
|
||||||
|
throws IOException {
|
||||||
|
return admin;
|
||||||
|
}
|
||||||
|
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=500)
|
||||||
|
public void testRefreshQueues() throws Exception {
|
||||||
|
String[] args = { "-refreshQueues" };
|
||||||
|
assertEquals(0, rmAdminCLI.run(args));
|
||||||
|
verify(admin).refreshQueues(any(RefreshQueuesRequest.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=500)
|
||||||
|
public void testRefreshUserToGroupsMappings() throws Exception {
|
||||||
|
String[] args = { "-refreshUserToGroupsMappings" };
|
||||||
|
assertEquals(0, rmAdminCLI.run(args));
|
||||||
|
verify(admin).refreshUserToGroupsMappings(
|
||||||
|
any(RefreshUserToGroupsMappingsRequest.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=500)
|
||||||
|
public void testRefreshSuperUserGroupsConfiguration() throws Exception {
|
||||||
|
String[] args = { "-refreshSuperUserGroupsConfiguration" };
|
||||||
|
assertEquals(0, rmAdminCLI.run(args));
|
||||||
|
verify(admin).refreshSuperUserGroupsConfiguration(
|
||||||
|
any(RefreshSuperUserGroupsConfigurationRequest.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=500)
|
||||||
|
public void testRefreshAdminAcls() throws Exception {
|
||||||
|
String[] args = { "-refreshAdminAcls" };
|
||||||
|
assertEquals(0, rmAdminCLI.run(args));
|
||||||
|
verify(admin).refreshAdminAcls(any(RefreshAdminAclsRequest.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=500)
|
||||||
|
public void testRefreshServiceAcl() throws Exception {
|
||||||
|
String[] args = { "-refreshServiceAcl" };
|
||||||
|
assertEquals(0, rmAdminCLI.run(args));
|
||||||
|
verify(admin).refreshServiceAcls(any(RefreshServiceAclsRequest.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=500)
|
||||||
|
public void testRefreshNodes() throws Exception {
|
||||||
|
String[] args = { "-refreshNodes" };
|
||||||
|
assertEquals(0, rmAdminCLI.run(args));
|
||||||
|
verify(admin).refreshNodes(any(RefreshNodesRequest.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=500)
|
||||||
|
public void testGetGroups() throws Exception {
|
||||||
|
when(admin.getGroupsForUser(eq("admin"))).thenReturn(
|
||||||
|
new String[] {"group1", "group2"});
|
||||||
|
PrintStream origOut = System.out;
|
||||||
|
PrintStream out = mock(PrintStream.class);
|
||||||
|
System.setOut(out);
|
||||||
|
try {
|
||||||
|
String[] args = { "-getGroups", "admin" };
|
||||||
|
assertEquals(0, rmAdminCLI.run(args));
|
||||||
|
verify(admin).getGroupsForUser(eq("admin"));
|
||||||
|
verify(out).println(argThat(new ArgumentMatcher<StringBuilder>() {
|
||||||
|
@Override
|
||||||
|
public boolean matches(Object argument) {
|
||||||
|
return ("" + argument).equals("admin : group1 group2");
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
} finally {
|
||||||
|
System.setOut(origOut);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test printing of help messages
|
||||||
|
*/
|
||||||
|
@Test(timeout=500)
|
||||||
|
public void testHelp() throws Exception {
|
||||||
|
PrintStream oldOutPrintStream = System.out;
|
||||||
|
PrintStream oldErrPrintStream = System.err;
|
||||||
|
ByteArrayOutputStream dataOut = new ByteArrayOutputStream();
|
||||||
|
ByteArrayOutputStream dataErr = new ByteArrayOutputStream();
|
||||||
|
System.setOut(new PrintStream(dataOut));
|
||||||
|
System.setErr(new PrintStream(dataErr));
|
||||||
|
try {
|
||||||
|
String[] args = { "-help" };
|
||||||
|
assertEquals(0, rmAdminCLI.run(args));
|
||||||
|
assertTrue(dataOut
|
||||||
|
.toString()
|
||||||
|
.contains(
|
||||||
|
"rmadmin is the command to execute Map-Reduce" +
|
||||||
|
" administrative commands."));
|
||||||
|
assertTrue(dataOut
|
||||||
|
.toString()
|
||||||
|
.contains(
|
||||||
|
"hadoop rmadmin [-refreshQueues] [-refreshNodes] [-refreshSuper" +
|
||||||
|
"UserGroupsConfiguration] [-refreshUserToGroupsMappings] " +
|
||||||
|
"[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup" +
|
||||||
|
" [username]] [-help [cmd]]"));
|
||||||
|
assertTrue(dataOut
|
||||||
|
.toString()
|
||||||
|
.contains(
|
||||||
|
"-refreshQueues: Reload the queues' acls, states and scheduler " +
|
||||||
|
"specific properties."));
|
||||||
|
assertTrue(dataOut
|
||||||
|
.toString()
|
||||||
|
.contains(
|
||||||
|
"-refreshNodes: Refresh the hosts information at the " +
|
||||||
|
"ResourceManager."));
|
||||||
|
assertTrue(dataOut.toString().contains(
|
||||||
|
"-refreshUserToGroupsMappings: Refresh user-to-groups mappings"));
|
||||||
|
assertTrue(dataOut
|
||||||
|
.toString()
|
||||||
|
.contains(
|
||||||
|
"-refreshSuperUserGroupsConfiguration: Refresh superuser proxy" +
|
||||||
|
" groups mappings"));
|
||||||
|
assertTrue(dataOut
|
||||||
|
.toString()
|
||||||
|
.contains(
|
||||||
|
"-refreshAdminAcls: Refresh acls for administration of " +
|
||||||
|
"ResourceManager"));
|
||||||
|
assertTrue(dataOut
|
||||||
|
.toString()
|
||||||
|
.contains(
|
||||||
|
"-refreshServiceAcl: Reload the service-level authorization" +
|
||||||
|
" policy file"));
|
||||||
|
assertTrue(dataOut
|
||||||
|
.toString()
|
||||||
|
.contains(
|
||||||
|
"-help [cmd]: \tDisplays help for the given command or all " +
|
||||||
|
"commands if none"));
|
||||||
|
|
||||||
|
testError(new String[] { "-help", "-refreshQueues" },
|
||||||
|
"Usage: java RMAdmin [-refreshQueues]", dataErr, 0);
|
||||||
|
testError(new String[] { "-help", "-refreshNodes" },
|
||||||
|
"Usage: java RMAdmin [-refreshNodes]", dataErr, 0);
|
||||||
|
testError(new String[] { "-help", "-refreshUserToGroupsMappings" },
|
||||||
|
"Usage: java RMAdmin [-refreshUserToGroupsMappings]", dataErr, 0);
|
||||||
|
testError(
|
||||||
|
new String[] { "-help", "-refreshSuperUserGroupsConfiguration" },
|
||||||
|
"Usage: java RMAdmin [-refreshSuperUserGroupsConfiguration]",
|
||||||
|
dataErr, 0);
|
||||||
|
testError(new String[] { "-help", "-refreshAdminAcls" },
|
||||||
|
"Usage: java RMAdmin [-refreshAdminAcls]", dataErr, 0);
|
||||||
|
testError(new String[] { "-help", "-refreshService" },
|
||||||
|
"Usage: java RMAdmin [-refreshServiceAcl]", dataErr, 0);
|
||||||
|
testError(new String[] { "-help", "-getGroups" },
|
||||||
|
"Usage: java RMAdmin [-getGroups [username]]", dataErr, 0);
|
||||||
|
|
||||||
|
|
||||||
|
testError(new String[] { "-help", "-badParameter" },
|
||||||
|
"Usage: java RMAdmin", dataErr, 0);
|
||||||
|
testError(new String[] { "-badParameter" },
|
||||||
|
"badParameter: Unknown command", dataErr, -1);
|
||||||
|
} finally {
|
||||||
|
System.setOut(oldOutPrintStream);
|
||||||
|
System.setErr(oldErrPrintStream);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=500)
|
||||||
|
public void testException() throws Exception {
|
||||||
|
PrintStream oldErrPrintStream = System.err;
|
||||||
|
ByteArrayOutputStream dataErr = new ByteArrayOutputStream();
|
||||||
|
System.setErr(new PrintStream(dataErr));
|
||||||
|
try {
|
||||||
|
when(admin.refreshQueues(any(RefreshQueuesRequest.class)))
|
||||||
|
.thenThrow(new IOException("test exception"));
|
||||||
|
String[] args = { "-refreshQueues" };
|
||||||
|
|
||||||
|
assertEquals(-1, rmAdminCLI.run(args));
|
||||||
|
verify(admin).refreshQueues(any(RefreshQueuesRequest.class));
|
||||||
|
assertTrue(dataErr.toString().contains("refreshQueues: test exception"));
|
||||||
|
} finally {
|
||||||
|
System.setErr(oldErrPrintStream);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testError(String[] args, String template,
|
||||||
|
ByteArrayOutputStream data, int resultCode) throws Exception {
|
||||||
|
assertEquals(resultCode, rmAdminCLI.run(args));
|
||||||
|
assertTrue(data.toString().contains(template));
|
||||||
|
data.reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue