Merge r1454237 through r1455388 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1455390 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2013-03-12 01:58:43 +00:00
commit f35cd45811
32 changed files with 734 additions and 215 deletions

View File

@ -8,6 +8,12 @@ Trunk (Unreleased)
FSDataOutputStream.sync() and Syncable.sync(). (szetszwo) FSDataOutputStream.sync() and Syncable.sync(). (szetszwo)
HADOOP-8886. Remove KFS support. (eli) HADOOP-8886. Remove KFS support. (eli)
HADOOP-9163 The rpc msg in ProtobufRpcEngine.proto should be moved out to
avoid an extra copy (Sanjay Radia)
HADOOP-9151 Include RPC error info in RpcResponseHeader instead of sending
it separately (sanjay Radia)
NEW FEATURES NEW FEATURES
@ -157,8 +163,6 @@ Trunk (Unreleased)
HADOOP-9112. test-patch should -1 for @Tests without a timeout HADOOP-9112. test-patch should -1 for @Tests without a timeout
(Surenkumar Nihalani via bobby) (Surenkumar Nihalani via bobby)
HADOOP-9163 The rpc msg in ProtobufRpcEngine.proto should be moved out to
avoid an extra copy (Sanjay Radia)
BUG FIXES BUG FIXES
@ -538,6 +542,9 @@ Release 2.0.5-beta - UNRELEASED
HADOOP-9369. DNS#reverseDns() can return hostname with . appended at the HADOOP-9369. DNS#reverseDns() can return hostname with . appended at the
end. (Karthik Kambatla via atm) end. (Karthik Kambatla via atm)
HADOOP-9379. capture the ulimit info after printing the log to the
console. (Arpit Gupta via suresh)
Release 2.0.4-alpha - UNRELEASED Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -308,4 +308,11 @@
<Method name="removeRenewAction" /> <Method name="removeRenewAction" />
<Bug pattern="BC_UNCONFIRMED_CAST" /> <Bug pattern="BC_UNCONFIRMED_CAST" />
</Match> </Match>
<!-- Inconsistent synchronization flagged by findbugs is not valid. -->
<Match>
<Class name="org.apache.hadoop.ipc.Client$Connection" />
<Field name="in" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
</FindBugsFilter> </FindBugsFilter>

View File

@ -156,6 +156,7 @@ case $startStop in
esac esac
echo $! > $pid echo $! > $pid
sleep 1 sleep 1
head "$log"
# capture the ulimit output # capture the ulimit output
if [ "true" = "$starting_secure_dn" ]; then if [ "true" = "$starting_secure_dn" ]; then
echo "ulimit -a for secure datanode user $HADOOP_SECURE_DN_USER" >> $log echo "ulimit -a for secure datanode user $HADOOP_SECURE_DN_USER" >> $log
@ -165,7 +166,6 @@ case $startStop in
echo "ulimit -a for user $USER" >> $log echo "ulimit -a for user $USER" >> $log
ulimit -a >> $log 2>&1 ulimit -a >> $log 2>&1
fi fi
head -30 "$log"
sleep 3; sleep 3;
if ! ps -p $! > /dev/null ; then if ! ps -p $! > /dev/null ; then
exit 1 exit 1

View File

@ -59,7 +59,6 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
@ -945,31 +944,38 @@ public class Client {
touch(); touch();
try { try {
RpcResponseHeaderProto response = RpcResponseHeaderProto header =
RpcResponseHeaderProto.parseDelimitedFrom(in); RpcResponseHeaderProto.parseDelimitedFrom(in);
if (response == null) { if (header == null) {
throw new IOException("Response is null."); throw new IOException("Response is null.");
} }
int callId = response.getCallId(); int callId = header.getCallId();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug(getName() + " got value #" + callId); LOG.debug(getName() + " got value #" + callId);
Call call = calls.get(callId); Call call = calls.get(callId);
RpcStatusProto status = response.getStatus(); RpcStatusProto status = header.getStatus();
if (status == RpcStatusProto.SUCCESS) { if (status == RpcStatusProto.SUCCESS) {
Writable value = ReflectionUtils.newInstance(valueClass, conf); Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in); // read value value.readFields(in); // read value
call.setRpcResponse(value); call.setRpcResponse(value);
calls.remove(callId); calls.remove(callId);
} else if (status == RpcStatusProto.ERROR) { } else { // Rpc Request failed
call.setException(new RemoteException(WritableUtils.readString(in), final String exceptionClassName = header.hasExceptionClassName() ?
WritableUtils.readString(in))); header.getExceptionClassName() :
calls.remove(callId); "ServerDidNotSetExceptionClassName";
} else if (status == RpcStatusProto.FATAL) { final String errorMsg = header.hasErrorMsg() ?
// Close the connection header.getErrorMsg() : "ServerDidNotSetErrorMsg" ;
markClosed(new RemoteException(WritableUtils.readString(in), RemoteException re =
WritableUtils.readString(in))); new RemoteException(exceptionClassName, errorMsg);
if (status == RpcStatusProto.ERROR) {
call.setException(re);
calls.remove(callId);
} else if (status == RpcStatusProto.FATAL) {
// Close the connection
markClosed(re);
}
} }
} catch (IOException e) { } catch (IOException e) {
markClosed(e); markClosed(e);

View File

@ -2001,6 +2001,7 @@ public abstract class Server {
RpcResponseHeaderProto.newBuilder(); RpcResponseHeaderProto.newBuilder();
response.setCallId(call.callId); response.setCallId(call.callId);
response.setStatus(status); response.setStatus(status);
response.setServerIpcVersionNum(Server.CURRENT_VERSION);
if (status == RpcStatusProto.SUCCESS) { if (status == RpcStatusProto.SUCCESS) {
@ -2017,13 +2018,10 @@ public abstract class Server {
StringUtils.stringifyException(t)); StringUtils.stringifyException(t));
return; return;
} }
} else { } else { // Rpc Failure
if (status == RpcStatusProto.FATAL) { response.setExceptionClassName(errorClass);
response.setServerIpcVersionNum(Server.CURRENT_VERSION); response.setErrorMsg(error);
}
response.build().writeDelimitedTo(out); response.build().writeDelimitedTo(out);
WritableUtils.writeString(out, errorClass);
WritableUtils.writeString(out, error);
} }
if (call.connection.useWrap) { if (call.connection.useWrap) {
wrapWithSasl(responseBuf, call); wrapWithSasl(responseBuf, call);

View File

@ -23,11 +23,10 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Shell;
/** /**
* Class that provides utility functions for checking disk problem * Class that provides utility functions for checking disk problem
@ -36,10 +35,16 @@ import org.apache.hadoop.fs.permission.FsPermission;
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class DiskChecker { public class DiskChecker {
private static final long SHELL_TIMEOUT = 10 * 1000;
public static class DiskErrorException extends IOException { public static class DiskErrorException extends IOException {
public DiskErrorException(String msg) { public DiskErrorException(String msg) {
super(msg); super(msg);
} }
public DiskErrorException(String msg, Throwable cause) {
super(msg, cause);
}
} }
public static class DiskOutOfSpaceException extends IOException { public static class DiskOutOfSpaceException extends IOException {
@ -85,25 +90,11 @@ public class DiskChecker {
* @throws DiskErrorException * @throws DiskErrorException
*/ */
public static void checkDir(File dir) throws DiskErrorException { public static void checkDir(File dir) throws DiskErrorException {
if (!mkdirsWithExistsCheck(dir)) if (!mkdirsWithExistsCheck(dir)) {
throw new DiskErrorException("Can not create directory: " throw new DiskErrorException("Can not create directory: "
+ dir.toString()); + dir.toString());
}
if (!dir.isDirectory()) checkDirAccess(dir);
throw new DiskErrorException("Not a directory: "
+ dir.toString());
if (!dir.canRead())
throw new DiskErrorException("Directory is not readable: "
+ dir.toString());
if (!dir.canWrite())
throw new DiskErrorException("Directory is not writable: "
+ dir.toString());
if (!dir.canExecute())
throw new DiskErrorException("Directory is not executable: "
+ dir.toString());
} }
/** /**
@ -152,24 +143,102 @@ public class DiskChecker {
FsPermission expected) FsPermission expected)
throws DiskErrorException, IOException { throws DiskErrorException, IOException {
mkdirsWithExistsAndPermissionCheck(localFS, dir, expected); mkdirsWithExistsAndPermissionCheck(localFS, dir, expected);
checkDirAccess(localFS.pathToFile(dir));
}
FileStatus stat = localFS.getFileStatus(dir); /**
FsPermission actual = stat.getPermission(); * Checks that the given file is a directory and that the current running
* process can read, write, and execute it.
if (!stat.isDirectory()) *
throw new DiskErrorException("not a directory: "+ dir.toString()); * @param dir File to check
* @throws DiskErrorException if dir is not a directory, not readable, not
FsAction user = actual.getUserAction(); * writable, or not executable
if (!user.implies(FsAction.READ)) */
throw new DiskErrorException("directory is not readable: " private static void checkDirAccess(File dir) throws DiskErrorException {
if (!dir.isDirectory()) {
throw new DiskErrorException("Not a directory: "
+ dir.toString()); + dir.toString());
}
if (!user.implies(FsAction.WRITE)) if (Shell.WINDOWS) {
throw new DiskErrorException("directory is not writable: " checkAccessByFileSystemInteraction(dir);
+ dir.toString()); } else {
checkAccessByFileMethods(dir);
}
}
if (!user.implies(FsAction.EXECUTE)) /**
throw new DiskErrorException("directory is not listable: " * Checks that the current running process can read, write, and execute the
* given directory by using methods of the File object.
*
* @param dir File to check
* @throws DiskErrorException if dir is not readable, not writable, or not
* executable
*/
private static void checkAccessByFileMethods(File dir)
throws DiskErrorException {
if (!dir.canRead()) {
throw new DiskErrorException("Directory is not readable: "
+ dir.toString()); + dir.toString());
}
if (!dir.canWrite()) {
throw new DiskErrorException("Directory is not writable: "
+ dir.toString());
}
if (!dir.canExecute()) {
throw new DiskErrorException("Directory is not executable: "
+ dir.toString());
}
}
/**
* Checks that the current running process can read, write, and execute the
* given directory by attempting each of those operations on the file system.
* This method contains several workarounds to known JVM bugs that cause
* File.canRead, File.canWrite, and File.canExecute to return incorrect results
* on Windows with NTFS ACLs. See:
* http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6203387
* These bugs are supposed to be fixed in JDK7.
*
* @param dir File to check
* @throws DiskErrorException if dir is not readable, not writable, or not
* executable
*/
private static void checkAccessByFileSystemInteraction(File dir)
throws DiskErrorException {
// Make sure we can read the directory by listing it.
if (dir.list() == null) {
throw new DiskErrorException("Directory is not readable: "
+ dir.toString());
}
// Make sure we can write to the directory by creating a temp file in it.
try {
File tempFile = File.createTempFile("checkDirAccess", null, dir);
if (!tempFile.delete()) {
throw new DiskErrorException("Directory is not writable: "
+ dir.toString());
}
} catch (IOException e) {
throw new DiskErrorException("Directory is not writable: "
+ dir.toString(), e);
}
// Make sure the directory is executable by trying to cd into it. This
// launches a separate process. It does not change the working directory of
// the current process.
try {
String[] cdCmd = new String[] { "cmd", "/C", "cd",
dir.getAbsolutePath() };
Shell.execCommand(null, cdCmd, SHELL_TIMEOUT);
} catch (Shell.ExitCodeException e) {
throw new DiskErrorException("Directory is not executable: "
+ dir.toString(), e);
} catch (IOException e) {
throw new DiskErrorException("Directory is not executable: "
+ dir.toString(), e);
}
} }
} }

View File

@ -70,12 +70,11 @@ message RpcRequestHeaderProto { // the header for the RpcRequest
* | RpcResponseHeaderProto - serialized delimited ie has len | * | RpcResponseHeaderProto - serialized delimited ie has len |
* +------------------------------------------------------------------+ * +------------------------------------------------------------------+
* | if request is successful: | * | if request is successful: |
* | - RpcResponse - The actual rpc response bytes | * | - RpcResponse - The actual rpc response bytes follow |
* | This response is serialized based on RpcKindProto | * the response header |
* | This response is serialized based on RpcKindProto |
* | if request fails : | * | if request fails : |
* | - length (4 byte int) + Class name of exception - UTF-8 string | * | The rpc response header contains the necessary info |
* | - length (4 byte int) + Stacktrace - UTF-8 string |
* | if the strings are null then the length is -1 |
* +------------------------------------------------------------------+ * +------------------------------------------------------------------+
* *
*/ */
@ -88,5 +87,7 @@ message RpcResponseHeaderProto {
required uint32 callId = 1; // callId used in Request required uint32 callId = 1; // callId used in Request
required RpcStatusProto status = 2; required RpcStatusProto status = 2;
optional uint32 serverIpcVersionNum = 3; // in case of an fatal IPC error optional uint32 serverIpcVersionNum = 3; // Sent if success or fail
optional string exceptionClassName = 4; // if request fails
optional string errorMsg = 5; // if request fails, often contains strack trace
} }

View File

@ -25,10 +25,13 @@ import static org.junit.Assert.*;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
import static org.apache.hadoop.test.MockitoMaker.*; import static org.apache.hadoop.test.MockitoMaker.*;
import org.apache.hadoop.fs.permission.FsPermission; import static org.apache.hadoop.fs.permission.FsAction.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
@ -110,29 +113,21 @@ public class TestDiskChecker {
private void _checkDirs(boolean isDir, FsPermission perm, boolean success) private void _checkDirs(boolean isDir, FsPermission perm, boolean success)
throws Throwable { throws Throwable {
File localDir = make(stub(File.class).returning(true).from.exists()); File localDir = File.createTempFile("test", "tmp");
when(localDir.mkdir()).thenReturn(true); if (isDir) {
Path dir = mock(Path.class); localDir.delete();
LocalFileSystem fs = make(stub(LocalFileSystem.class) localDir.mkdir();
.returning(localDir).from.pathToFile(dir)); }
FileStatus stat = make(stub(FileStatus.class) Shell.execCommand(Shell.getSetPermissionCommand(String.format("%04o",
.returning(perm).from.getPermission()); perm.toShort()), false, localDir.getAbsolutePath()));
when(stat.isDirectory()).thenReturn(isDir);
when(fs.getFileStatus(dir)).thenReturn(stat);
try { try {
DiskChecker.checkDir(fs, dir, perm); DiskChecker.checkDir(FileSystem.getLocal(new Configuration()),
new Path(localDir.getAbsolutePath()), perm);
verify(stat).isDirectory();
verify(fs, times(2)).getFileStatus(dir);
verify(stat, times(2)).getPermission();
assertTrue("checkDir success", success); assertTrue("checkDir success", success);
} } catch (DiskErrorException e) {
catch (DiskErrorException e) {
assertFalse("checkDir success", success); assertFalse("checkDir success", success);
e.printStackTrace();
} }
System.out.println("checkDir success: "+ success); localDir.delete();
} }
/** /**
@ -168,8 +163,10 @@ public class TestDiskChecker {
private void _checkDirs(boolean isDir, String perm, boolean success) private void _checkDirs(boolean isDir, String perm, boolean success)
throws Throwable { throws Throwable {
File localDir = File.createTempFile("test", "tmp"); File localDir = File.createTempFile("test", "tmp");
localDir.delete(); if (isDir) {
localDir.mkdir(); localDir.delete();
localDir.mkdir();
}
Shell.execCommand(Shell.getSetPermissionCommand(perm, false, Shell.execCommand(Shell.getSetPermissionCommand(perm, false,
localDir.getAbsolutePath())); localDir.getAbsolutePath()));
try { try {

View File

@ -306,7 +306,10 @@ Trunk (Unreleased)
HDFS-4502. JsonUtil.toFileStatus(..) should check if the fileId property HDFS-4502. JsonUtil.toFileStatus(..) should check if the fileId property
exists. (Brandon Li via suresh) exists. (Brandon Li via suresh)
BREAKDOWN OF HADOOP-8562 SUBTASKS HDFS-4391. TestDataTransferKeepalive fails when tests are executed in a
certain order. (Andrew Wang via atm)
BREAKDOWN OF HADOOP-8562 SUBTASKS AND RELATED JIRAS
HDFS-4145. Merge hdfs cmd line scripts from branch-1-win. (David Lao, HDFS-4145. Merge hdfs cmd line scripts from branch-1-win. (David Lao,
Bikas Saha, Lauren Yang, Chuan Liu, Thejas M Nair and Ivan Mitic via suresh) Bikas Saha, Lauren Yang, Chuan Liu, Thejas M Nair and Ivan Mitic via suresh)
@ -320,6 +323,10 @@ Trunk (Unreleased)
HDFS-4297. Fix issues related to datanode concurrent reading and writing on HDFS-4297. Fix issues related to datanode concurrent reading and writing on
Windows. (Arpit Agarwal, Chuan Liu via suresh) Windows. (Arpit Agarwal, Chuan Liu via suresh)
HDFS-4573. Fix TestINodeFile on Windows. (Arpit Agarwal via suresh)
HDFS-4572. Fix TestJournal failures on Windows. (Arpit Agarwal via suresh)
Release 2.0.5-beta - UNRELEASED Release 2.0.5-beta - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
@ -376,6 +383,9 @@ Release 2.0.5-beta - UNRELEASED
HDFS-4565. Use DFSUtil.getSpnegoKeytabKey() to get the spnego keytab key HDFS-4565. Use DFSUtil.getSpnegoKeytabKey() to get the spnego keytab key
in secondary namenode and namenode http server. (Arpit Gupta via suresh) in secondary namenode and namenode http server. (Arpit Gupta via suresh)
HDFS-4571. WebHDFS should not set the service hostname on the server side.
(tucu)
Release 2.0.4-alpha - UNRELEASED Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
@ -2378,6 +2388,12 @@ Release 0.23.7 - UNRELEASED
HDFS-4566. Webdhfs token cancelation should use authentication (daryn) HDFS-4566. Webdhfs token cancelation should use authentication (daryn)
HDFS-4567. Webhdfs does not need a token for token operations (daryn via
kihwal)
HDFS-4577. Webhdfs operations should declare if authentication is required
(daryn via kihwal)
Release 0.23.6 - UNRELEASED Release 0.23.6 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.io.IOException; import java.io.IOException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.LinkedListMultimap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -253,7 +254,8 @@ class SocketCache {
/** /**
* Empty the cache, and close all sockets. * Empty the cache, and close all sockets.
*/ */
private synchronized void clear() { @VisibleForTesting
protected synchronized void clear() {
for (SocketAndStreams sockAndStream : multimap.values()) { for (SocketAndStreams sockAndStream : multimap.values()) {
sockAndStream.close(); sockAndStream.close();
} }

View File

@ -215,8 +215,8 @@ class Journal implements Closeable {
@Override // Closeable @Override // Closeable
public void close() throws IOException { public void close() throws IOException {
storage.close(); storage.close();
IOUtils.closeStream(committedTxnId); IOUtils.closeStream(committedTxnId);
IOUtils.closeStream(curSegment);
} }
JNStorage getStorage() { JNStorage getStorage() {

View File

@ -33,6 +33,7 @@ import java.util.Properties;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion; import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature; import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
@ -663,7 +664,9 @@ public abstract class Storage extends StorageInfo {
file.write(jvmName.getBytes(Charsets.UTF_8)); file.write(jvmName.getBytes(Charsets.UTF_8));
LOG.info("Lock on " + lockF + " acquired by nodename " + jvmName); LOG.info("Lock on " + lockF + " acquired by nodename " + jvmName);
} catch(OverlappingFileLockException oe) { } catch(OverlappingFileLockException oe) {
LOG.error("It appears that another namenode " + file.readLine() // Cannot read from the locked file on Windows.
String lockingJvmName = Path.WINDOWS ? "" : (" " + file.readLine());
LOG.error("It appears that another namenode" + lockingJvmName
+ " has already locked the storage directory"); + " has already locked the storage directory");
file.close(); file.close();
return null; return null;

View File

@ -99,7 +99,6 @@ import org.apache.hadoop.hdfs.web.resources.UserParam;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
@ -212,7 +211,6 @@ public class NamenodeWebHdfsMethods {
namenode, ugi, renewer != null? renewer: ugi.getShortUserName()); namenode, ugi, renewer != null? renewer: ugi.getShortUserName());
final Token<? extends TokenIdentifier> t = c.getAllTokens().iterator().next(); final Token<? extends TokenIdentifier> t = c.getAllTokens().iterator().next();
t.setKind(WebHdfsFileSystem.TOKEN_KIND); t.setKind(WebHdfsFileSystem.TOKEN_KIND);
SecurityUtil.setTokenService(t, namenode.getHttpAddress());
return t; return t;
} }

View File

@ -84,6 +84,7 @@ import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.mortbay.util.ajax.JSON; import org.mortbay.util.ajax.JSON;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -108,7 +109,8 @@ public class WebHdfsFileSystem extends FileSystem
private DelegationTokenRenewer dtRenewer = null; private DelegationTokenRenewer dtRenewer = null;
private synchronized void addRenewAction(final WebHdfsFileSystem webhdfs) { @VisibleForTesting
protected synchronized void addRenewAction(final WebHdfsFileSystem webhdfs) {
if (dtRenewer == null) { if (dtRenewer == null) {
dtRenewer = DelegationTokenRenewer.getInstance(); dtRenewer = DelegationTokenRenewer.getInstance();
} }
@ -127,6 +129,7 @@ public class WebHdfsFileSystem extends FileSystem
private UserGroupInformation ugi; private UserGroupInformation ugi;
private InetSocketAddress nnAddr; private InetSocketAddress nnAddr;
private URI uri; private URI uri;
private boolean hasInitedToken;
private Token<?> delegationToken; private Token<?> delegationToken;
private final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token(); private final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token();
private RetryPolicy retryPolicy = null; private RetryPolicy retryPolicy = null;
@ -173,24 +176,26 @@ public class WebHdfsFileSystem extends FileSystem
protected void initDelegationToken() throws IOException { protected void initDelegationToken() throws IOException {
// look for webhdfs token, then try hdfs // look for webhdfs token, then try hdfs
Token<?> token = selectDelegationToken(ugi); Token<?> token = selectDelegationToken(ugi);
//since we don't already have a token, go get one
boolean createdToken = false;
if (token == null) {
token = getDelegationToken(null);
createdToken = (token != null);
}
// security might be disabled
if (token != null) { if (token != null) {
LOG.debug("Found existing DT for " + token.getService());
setDelegationToken(token); setDelegationToken(token);
if (createdToken) { hasInitedToken = true;
}
}
protected synchronized Token<?> getDelegationToken() throws IOException {
if (!hasInitedToken) {
//since we don't already have a token, go get one
Token<?> token = getDelegationToken(null);
// security might be disabled
if (token != null) {
setDelegationToken(token);
addRenewAction(this); addRenewAction(this);
LOG.debug("Created new DT for " + token.getService()); LOG.debug("Created new DT for " + token.getService());
} else {
LOG.debug("Found existing DT for " + token.getService());
} }
hasInitedToken = true;
} }
return delegationToken;
} }
protected Token<DelegationTokenIdentifier> selectDelegationToken( protected Token<DelegationTokenIdentifier> selectDelegationToken(
@ -338,20 +343,13 @@ public class WebHdfsFileSystem extends FileSystem
List<Param<?,?>> authParams = Lists.newArrayList(); List<Param<?,?>> authParams = Lists.newArrayList();
// Skip adding delegation token for token operations because these // Skip adding delegation token for token operations because these
// operations require authentication. // operations require authentication.
boolean hasToken = false; Token<?> token = null;
if (UserGroupInformation.isSecurityEnabled() && if (UserGroupInformation.isSecurityEnabled() && !op.getRequireAuth()) {
op != GetOpParam.Op.GETDELEGATIONTOKEN && token = getDelegationToken();
op != PutOpParam.Op.RENEWDELEGATIONTOKEN &&
op != PutOpParam.Op.CANCELDELEGATIONTOKEN) {
synchronized (this) {
hasToken = (delegationToken != null);
if (hasToken) {
final String encoded = delegationToken.encodeToUrlString();
authParams.add(new DelegationParam(encoded));
} // else we are talking to an insecure cluster
}
} }
if (!hasToken) { if (token != null) {
authParams.add(new DelegationParam(token.encodeToUrlString()));
} else {
UserGroupInformation userUgi = ugi; UserGroupInformation userUgi = ugi;
UserGroupInformation realUgi = userUgi.getRealUser(); UserGroupInformation realUgi = userUgi.getRealUser();
if (realUgi != null) { // proxy user if (realUgi != null) { // proxy user

View File

@ -38,6 +38,11 @@ public class DeleteOpParam extends HttpOpParam<DeleteOpParam.Op> {
return HttpOpParam.Type.DELETE; return HttpOpParam.Type.DELETE;
} }
@Override
public boolean getRequireAuth() {
return false;
}
@Override @Override
public boolean getDoOutput() { public boolean getDoOutput() {
return false; return false;

View File

@ -31,7 +31,7 @@ public class GetOpParam extends HttpOpParam<GetOpParam.Op> {
GETFILECHECKSUM(true, HttpURLConnection.HTTP_OK), GETFILECHECKSUM(true, HttpURLConnection.HTTP_OK),
GETHOMEDIRECTORY(false, HttpURLConnection.HTTP_OK), GETHOMEDIRECTORY(false, HttpURLConnection.HTTP_OK),
GETDELEGATIONTOKEN(false, HttpURLConnection.HTTP_OK), GETDELEGATIONTOKEN(false, HttpURLConnection.HTTP_OK, true),
/** GET_BLOCK_LOCATIONS is a private unstable op. */ /** GET_BLOCK_LOCATIONS is a private unstable op. */
GET_BLOCK_LOCATIONS(false, HttpURLConnection.HTTP_OK), GET_BLOCK_LOCATIONS(false, HttpURLConnection.HTTP_OK),
@ -40,16 +40,28 @@ public class GetOpParam extends HttpOpParam<GetOpParam.Op> {
final boolean redirect; final boolean redirect;
final int expectedHttpResponseCode; final int expectedHttpResponseCode;
final boolean requireAuth;
Op(final boolean redirect, final int expectedHttpResponseCode) { Op(final boolean redirect, final int expectedHttpResponseCode) {
this(redirect, expectedHttpResponseCode, false);
}
Op(final boolean redirect, final int expectedHttpResponseCode,
final boolean requireAuth) {
this.redirect = redirect; this.redirect = redirect;
this.expectedHttpResponseCode = expectedHttpResponseCode; this.expectedHttpResponseCode = expectedHttpResponseCode;
this.requireAuth = requireAuth;
} }
@Override @Override
public HttpOpParam.Type getType() { public HttpOpParam.Type getType() {
return HttpOpParam.Type.GET; return HttpOpParam.Type.GET;
} }
@Override
public boolean getRequireAuth() {
return requireAuth;
}
@Override @Override
public boolean getDoOutput() { public boolean getDoOutput() {

View File

@ -43,6 +43,9 @@ public abstract class HttpOpParam<E extends Enum<E> & HttpOpParam.Op>
/** @return the Http operation type. */ /** @return the Http operation type. */
public Type getType(); public Type getType();
/** @return true if the operation cannot use a token */
public boolean getRequireAuth();
/** @return true if the operation will do output. */ /** @return true if the operation will do output. */
public boolean getDoOutput(); public boolean getDoOutput();
@ -92,6 +95,11 @@ public abstract class HttpOpParam<E extends Enum<E> & HttpOpParam.Op>
return op.getType(); return op.getType();
} }
@Override
public boolean getRequireAuth() {
return op.getRequireAuth();
}
@Override @Override
public boolean getDoOutput() { public boolean getDoOutput() {
return op.getDoOutput(); return op.getDoOutput();

View File

@ -41,6 +41,11 @@ public class PostOpParam extends HttpOpParam<PostOpParam.Op> {
public Type getType() { public Type getType() {
return Type.POST; return Type.POST;
} }
@Override
public boolean getRequireAuth() {
return false;
}
@Override @Override
public boolean getDoOutput() { public boolean getDoOutput() {

View File

@ -34,23 +34,35 @@ public class PutOpParam extends HttpOpParam<PutOpParam.Op> {
SETPERMISSION(false, HttpURLConnection.HTTP_OK), SETPERMISSION(false, HttpURLConnection.HTTP_OK),
SETTIMES(false, HttpURLConnection.HTTP_OK), SETTIMES(false, HttpURLConnection.HTTP_OK),
RENEWDELEGATIONTOKEN(false, HttpURLConnection.HTTP_OK), RENEWDELEGATIONTOKEN(false, HttpURLConnection.HTTP_OK, true),
CANCELDELEGATIONTOKEN(false, HttpURLConnection.HTTP_OK), CANCELDELEGATIONTOKEN(false, HttpURLConnection.HTTP_OK, true),
NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED); NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED);
final boolean doOutputAndRedirect; final boolean doOutputAndRedirect;
final int expectedHttpResponseCode; final int expectedHttpResponseCode;
final boolean requireAuth;
Op(final boolean doOutputAndRedirect, final int expectedHttpResponseCode) { Op(final boolean doOutputAndRedirect, final int expectedHttpResponseCode) {
this(doOutputAndRedirect, expectedHttpResponseCode, false);
}
Op(final boolean doOutputAndRedirect, final int expectedHttpResponseCode,
final boolean requireAuth) {
this.doOutputAndRedirect = doOutputAndRedirect; this.doOutputAndRedirect = doOutputAndRedirect;
this.expectedHttpResponseCode = expectedHttpResponseCode; this.expectedHttpResponseCode = expectedHttpResponseCode;
this.requireAuth = requireAuth;
} }
@Override @Override
public HttpOpParam.Type getType() { public HttpOpParam.Type getType() {
return HttpOpParam.Type.PUT; return HttpOpParam.Type.PUT;
} }
@Override
public boolean getRequireAuth() {
return requireAuth;
}
@Override @Override
public boolean getDoOutput() { public boolean getDoOutput() {

View File

@ -70,6 +70,7 @@ public class TestDataTransferKeepalive {
.numDataNodes(1).build(); .numDataNodes(1).build();
fs = cluster.getFileSystem(); fs = cluster.getFileSystem();
dfsClient = ((DistributedFileSystem)fs).dfs; dfsClient = ((DistributedFileSystem)fs).dfs;
dfsClient.socketCache.clear();
String poolId = cluster.getNamesystem().getBlockPoolId(); String poolId = cluster.getNamesystem().getBlockPoolId();
dn = cluster.getDataNodes().get(0); dn = cluster.getDataNodes().get(0);

View File

@ -36,10 +36,7 @@ import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After; import org.junit.*;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
public class TestJournal { public class TestJournal {
@ -77,7 +74,7 @@ public class TestJournal {
IOUtils.closeStream(journal); IOUtils.closeStream(journal);
} }
@Test @Test (timeout = 10000)
public void testEpochHandling() throws Exception { public void testEpochHandling() throws Exception {
assertEquals(0, journal.getLastPromisedEpoch()); assertEquals(0, journal.getLastPromisedEpoch());
NewEpochResponseProto newEpoch = NewEpochResponseProto newEpoch =
@ -110,7 +107,7 @@ public class TestJournal {
} }
} }
@Test @Test (timeout = 10000)
public void testMaintainCommittedTxId() throws Exception { public void testMaintainCommittedTxId() throws Exception {
journal.newEpoch(FAKE_NSINFO, 1); journal.newEpoch(FAKE_NSINFO, 1);
journal.startLogSegment(makeRI(1), 1); journal.startLogSegment(makeRI(1), 1);
@ -125,7 +122,7 @@ public class TestJournal {
assertEquals(3, journal.getCommittedTxnIdForTests()); assertEquals(3, journal.getCommittedTxnIdForTests());
} }
@Test @Test (timeout = 10000)
public void testRestartJournal() throws Exception { public void testRestartJournal() throws Exception {
journal.newEpoch(FAKE_NSINFO, 1); journal.newEpoch(FAKE_NSINFO, 1);
journal.startLogSegment(makeRI(1), 1); journal.startLogSegment(makeRI(1), 1);
@ -149,7 +146,7 @@ public class TestJournal {
assertEquals(1, newEpoch.getLastSegmentTxId()); assertEquals(1, newEpoch.getLastSegmentTxId());
} }
@Test @Test (timeout = 10000)
public void testFormatResetsCachedValues() throws Exception { public void testFormatResetsCachedValues() throws Exception {
journal.newEpoch(FAKE_NSINFO, 12345L); journal.newEpoch(FAKE_NSINFO, 12345L);
journal.startLogSegment(new RequestInfo(JID, 12345L, 1L, 0L), 1L); journal.startLogSegment(new RequestInfo(JID, 12345L, 1L, 0L), 1L);
@ -158,6 +155,8 @@ public class TestJournal {
assertEquals(12345L, journal.getLastWriterEpoch()); assertEquals(12345L, journal.getLastWriterEpoch());
assertTrue(journal.isFormatted()); assertTrue(journal.isFormatted());
// Close the journal in preparation for reformatting it.
journal.close();
journal.format(FAKE_NSINFO_2); journal.format(FAKE_NSINFO_2);
assertEquals(0, journal.getLastPromisedEpoch()); assertEquals(0, journal.getLastPromisedEpoch());
@ -170,7 +169,7 @@ public class TestJournal {
* before any transactions are written, that the next newEpoch() call * before any transactions are written, that the next newEpoch() call
* returns the prior segment txid as its most recent segment. * returns the prior segment txid as its most recent segment.
*/ */
@Test @Test (timeout = 10000)
public void testNewEpochAtBeginningOfSegment() throws Exception { public void testNewEpochAtBeginningOfSegment() throws Exception {
journal.newEpoch(FAKE_NSINFO, 1); journal.newEpoch(FAKE_NSINFO, 1);
journal.startLogSegment(makeRI(1), 1); journal.startLogSegment(makeRI(1), 1);
@ -182,7 +181,7 @@ public class TestJournal {
assertEquals(1, resp.getLastSegmentTxId()); assertEquals(1, resp.getLastSegmentTxId());
} }
@Test @Test (timeout = 10000)
public void testJournalLocking() throws Exception { public void testJournalLocking() throws Exception {
Assume.assumeTrue(journal.getStorage().getStorageDir(0).isLockSupported()); Assume.assumeTrue(journal.getStorage().getStorageDir(0).isLockSupported());
StorageDirectory sd = journal.getStorage().getStorageDir(0); StorageDirectory sd = journal.getStorage().getStorageDir(0);
@ -206,13 +205,14 @@ public class TestJournal {
// Hence, should be able to create a new Journal in the same dir. // Hence, should be able to create a new Journal in the same dir.
Journal journal2 = new Journal(TEST_LOG_DIR, JID, mockErrorReporter); Journal journal2 = new Journal(TEST_LOG_DIR, JID, mockErrorReporter);
journal2.newEpoch(FAKE_NSINFO, 2); journal2.newEpoch(FAKE_NSINFO, 2);
journal2.close();
} }
/** /**
* Test finalizing a segment after some batch of edits were missed. * Test finalizing a segment after some batch of edits were missed.
* This should fail, since we validate the log before finalization. * This should fail, since we validate the log before finalization.
*/ */
@Test @Test (timeout = 10000)
public void testFinalizeWhenEditsAreMissed() throws Exception { public void testFinalizeWhenEditsAreMissed() throws Exception {
journal.newEpoch(FAKE_NSINFO, 1); journal.newEpoch(FAKE_NSINFO, 1);
journal.startLogSegment(makeRI(1), 1); journal.startLogSegment(makeRI(1), 1);
@ -246,7 +246,7 @@ public class TestJournal {
* Ensure that finalizing a segment which doesn't exist throws the * Ensure that finalizing a segment which doesn't exist throws the
* appropriate exception. * appropriate exception.
*/ */
@Test @Test (timeout = 10000)
public void testFinalizeMissingSegment() throws Exception { public void testFinalizeMissingSegment() throws Exception {
journal.newEpoch(FAKE_NSINFO, 1); journal.newEpoch(FAKE_NSINFO, 1);
try { try {
@ -267,7 +267,7 @@ public class TestJournal {
* Eventually, the connection comes back, and the NN tries to start a new * Eventually, the connection comes back, and the NN tries to start a new
* segment at a higher txid. This should abort the old one and succeed. * segment at a higher txid. This should abort the old one and succeed.
*/ */
@Test @Test (timeout = 10000)
public void testAbortOldSegmentIfFinalizeIsMissed() throws Exception { public void testAbortOldSegmentIfFinalizeIsMissed() throws Exception {
journal.newEpoch(FAKE_NSINFO, 1); journal.newEpoch(FAKE_NSINFO, 1);
@ -296,7 +296,7 @@ public class TestJournal {
* Test behavior of startLogSegment() when a segment with the * Test behavior of startLogSegment() when a segment with the
* same transaction ID already exists. * same transaction ID already exists.
*/ */
@Test @Test (timeout = 10000)
public void testStartLogSegmentWhenAlreadyExists() throws Exception { public void testStartLogSegmentWhenAlreadyExists() throws Exception {
journal.newEpoch(FAKE_NSINFO, 1); journal.newEpoch(FAKE_NSINFO, 1);
@ -345,7 +345,7 @@ public class TestJournal {
return new RequestInfo(JID, 1, serial, 0); return new RequestInfo(JID, 1, serial, 0);
} }
@Test @Test (timeout = 10000)
public void testNamespaceVerification() throws Exception { public void testNamespaceVerification() throws Exception {
journal.newEpoch(FAKE_NSINFO, 1); journal.newEpoch(FAKE_NSINFO, 1);

View File

@ -170,34 +170,41 @@ public class TestINodeFile {
long fileLen = 1024; long fileLen = 1024;
replication = 3; replication = 3;
Configuration conf = new Configuration(); Configuration conf = new Configuration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes( MiniDFSCluster cluster = null;
replication).build(); try {
cluster.waitActive(); cluster =
FSNamesystem fsn = cluster.getNamesystem(); new MiniDFSCluster.Builder(conf).numDataNodes(replication).build();
FSDirectory fsdir = fsn.getFSDirectory(); cluster.waitActive();
DistributedFileSystem dfs = cluster.getFileSystem(); FSNamesystem fsn = cluster.getNamesystem();
FSDirectory fsdir = fsn.getFSDirectory();
// Create a file for test DistributedFileSystem dfs = cluster.getFileSystem();
final Path dir = new Path("/dir");
final Path file = new Path(dir, "file"); // Create a file for test
DFSTestUtil.createFile(dfs, file, fileLen, replication, 0L); final Path dir = new Path("/dir");
final Path file = new Path(dir, "file");
// Check the full path name of the INode associating with the file DFSTestUtil.createFile(dfs, file, fileLen, replication, 0L);
INode fnode = fsdir.getINode(file.toString());
assertEquals(file.toString(), fnode.getFullPathName()); // Check the full path name of the INode associating with the file
INode fnode = fsdir.getINode(file.toString());
// Call FSDirectory#unprotectedSetQuota which calls assertEquals(file.toString(), fnode.getFullPathName());
// INodeDirectory#replaceChild
dfs.setQuota(dir, Long.MAX_VALUE - 1, replication * fileLen * 10); // Call FSDirectory#unprotectedSetQuota which calls
final Path newDir = new Path("/newdir"); // INodeDirectory#replaceChild
final Path newFile = new Path(newDir, "file"); dfs.setQuota(dir, Long.MAX_VALUE - 1, replication * fileLen * 10);
// Also rename dir final Path newDir = new Path("/newdir");
dfs.rename(dir, newDir, Options.Rename.OVERWRITE); final Path newFile = new Path(newDir, "file");
// /dir/file now should be renamed to /newdir/file // Also rename dir
fnode = fsdir.getINode(newFile.toString()); dfs.rename(dir, newDir, Options.Rename.OVERWRITE);
// getFullPathName can return correct result only if the parent field of // /dir/file now should be renamed to /newdir/file
// child node is set correctly fnode = fsdir.getINode(newFile.toString());
assertEquals(newFile.toString(), fnode.getFullPathName()); // getFullPathName can return correct result only if the parent field of
// child node is set correctly
assertEquals(newFile.toString(), fnode.getFullPathName());
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
} }
@Test @Test
@ -353,41 +360,47 @@ public class TestINodeFile {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT); DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) MiniDFSCluster cluster = null;
.build(); try {
cluster.waitActive(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
FSNamesystem fsn = cluster.getNamesystem();
long lastId = fsn.getLastInodeId();
assertTrue(lastId == 1001); FSNamesystem fsn = cluster.getNamesystem();
long lastId = fsn.getLastInodeId();
// Create one directory and the last inode id should increase to 1002 assertTrue(lastId == 1001);
FileSystem fs = cluster.getFileSystem();
Path path = new Path("/test1");
assertTrue(fs.mkdirs(path));
assertTrue(fsn.getLastInodeId() == 1002);
// Use namenode rpc to create a file // Create one directory and the last inode id should increase to 1002
NamenodeProtocols nnrpc = cluster.getNameNodeRpc(); FileSystem fs = cluster.getFileSystem();
HdfsFileStatus fileStatus = nnrpc.create("/test1/file", new FsPermission( Path path = new Path("/test1");
(short) 0755), "client", assertTrue(fs.mkdirs(path));
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true, assertTrue(fsn.getLastInodeId() == 1002);
(short) 1, 128 * 1024 * 1024L);
assertTrue(fsn.getLastInodeId() == 1003);
assertTrue(fileStatus.getFileId() == 1003);
// Rename doesn't increase inode id // Use namenode rpc to create a file
Path renamedPath = new Path("/test2"); NamenodeProtocols nnrpc = cluster.getNameNodeRpc();
fs.rename(path, renamedPath); HdfsFileStatus fileStatus = nnrpc.create("/test1/file", new FsPermission(
assertTrue(fsn.getLastInodeId() == 1003); (short) 0755), "client",
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true,
(short) 1, 128 * 1024 * 1024L);
assertTrue(fsn.getLastInodeId() == 1003);
assertTrue(fileStatus.getFileId() == 1003);
cluster.restartNameNode(); // Rename doesn't increase inode id
cluster.waitActive(); Path renamedPath = new Path("/test2");
// Make sure empty editlog can be handled fs.rename(path, renamedPath);
cluster.restartNameNode(); assertTrue(fsn.getLastInodeId() == 1003);
cluster.waitActive();
assertTrue(fsn.getLastInodeId() == 1003); cluster.restartNameNode();
cluster.waitActive();
// Make sure empty editlog can be handled
cluster.restartNameNode();
cluster.waitActive();
assertTrue(fsn.getLastInodeId() == 1003);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
} }
@Test @Test

View File

@ -0,0 +1,202 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web;
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
import org.apache.hadoop.hdfs.web.resources.PostOpParam;
import org.apache.hadoop.hdfs.web.resources.PutOpParam;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestWebHdfsTokens {
static Configuration conf;
static UserGroupInformation ugi;
@BeforeClass
public static void setup() throws IOException {
conf = new Configuration();
SecurityUtil.setAuthenticationMethod(KERBEROS, conf);
UserGroupInformation.setConfiguration(conf);
ugi = UserGroupInformation.getCurrentUser();
}
@SuppressWarnings("unchecked")
@Test(timeout=1000)
public void testInitWithNoToken() throws IOException {
WebHdfsFileSystem fs = spy(new WebHdfsFileSystem());
doReturn(null).when(fs).getDelegationToken(anyString());
doNothing().when(fs).addRenewAction(any(WebHdfsFileSystem.class));
fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
// when not in ugi, don't get one
verify(fs).initDelegationToken();
verify(fs).selectDelegationToken(ugi);
verify(fs, never()).setDelegationToken(any(Token.class));
verify(fs, never()).getDelegationToken();
verify(fs, never()).getDelegationToken(anyString());
}
@SuppressWarnings("unchecked")
@Test(timeout=1000)
public void testInitWithUGIToken() throws IOException {
WebHdfsFileSystem fs = spy(new WebHdfsFileSystem());
Token<DelegationTokenIdentifier> token = mock(Token.class);
doReturn(token).when(fs).selectDelegationToken(ugi);
doReturn(null).when(fs).getDelegationToken(anyString());
doNothing().when(fs).addRenewAction(any(WebHdfsFileSystem.class));
fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
// when in the ugi, store it but don't renew it
verify(fs).initDelegationToken();
verify(fs).selectDelegationToken(ugi);
verify(fs).setDelegationToken(token);
verify(fs, never()).getDelegationToken();
verify(fs, never()).getDelegationToken(anyString());
verify(fs, never()).addRenewAction(fs);
}
@SuppressWarnings("unchecked")
@Test(timeout=1000)
public void testInternalGetDelegationToken() throws IOException {
WebHdfsFileSystem fs = spy(new WebHdfsFileSystem());
Token<DelegationTokenIdentifier> token = mock(Token.class);
doReturn(null).when(fs).selectDelegationToken(ugi);
doReturn(token).when(fs).getDelegationToken(anyString());
doNothing().when(fs).addRenewAction(any(WebHdfsFileSystem.class));
fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
// get token, store it, and renew it
Token<?> token2 = fs.getDelegationToken();
assertEquals(token2, token);
verify(fs).getDelegationToken(null);
verify(fs).setDelegationToken(token);
verify(fs).addRenewAction(fs);
reset(fs);
// just return token, don't get/set/renew
token2 = fs.getDelegationToken();
assertEquals(token2, token);
verify(fs, never()).getDelegationToken(null);
verify(fs, never()).setDelegationToken(any(Token.class));
verify(fs, never()).addRenewAction(fs);
}
@SuppressWarnings("unchecked")
@Test(timeout=1000)
public void testTokenForNonTokenOp() throws IOException {
WebHdfsFileSystem fs = spy(new WebHdfsFileSystem());
Token<DelegationTokenIdentifier> token = mock(Token.class);
doReturn(null).when(fs).selectDelegationToken(ugi);
doReturn(token).when(fs).getDelegationToken(null);
doNothing().when(fs).addRenewAction(any(WebHdfsFileSystem.class));
fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
// should get/set/renew token
fs.toUrl(GetOpParam.Op.OPEN, null);
verify(fs).getDelegationToken();
verify(fs).getDelegationToken(null);
verify(fs).setDelegationToken(token);
verify(fs).addRenewAction(fs);
reset(fs);
// should return prior token
fs.toUrl(GetOpParam.Op.OPEN, null);
verify(fs).getDelegationToken();
verify(fs, never()).getDelegationToken(null);
verify(fs, never()).setDelegationToken(token);
verify(fs, never()).addRenewAction(fs);
}
@Test(timeout=1000)
public void testNoTokenForGetToken() throws IOException {
checkNoTokenForOperation(GetOpParam.Op.GETDELEGATIONTOKEN);
}
@Test(timeout=1000)
public void testNoTokenForCanclToken() throws IOException {
checkNoTokenForOperation(PutOpParam.Op.RENEWDELEGATIONTOKEN);
}
@Test(timeout=1000)
public void testNoTokenForCancelToken() throws IOException {
checkNoTokenForOperation(PutOpParam.Op.CANCELDELEGATIONTOKEN);
}
@SuppressWarnings("unchecked")
private void checkNoTokenForOperation(HttpOpParam.Op op) throws IOException {
WebHdfsFileSystem fs = spy(new WebHdfsFileSystem());
doReturn(null).when(fs).selectDelegationToken(ugi);
doReturn(null).when(fs).getDelegationToken(null);
doNothing().when(fs).addRenewAction(any(WebHdfsFileSystem.class));
fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
// do not get a token!
fs.toUrl(op, null);
verify(fs, never()).getDelegationToken();
verify(fs, never()).getDelegationToken(null);
verify(fs, never()).setDelegationToken(any(Token.class));
verify(fs, never()).addRenewAction(fs);
}
@Test(timeout=1000)
public void testGetOpRequireAuth() {
for (HttpOpParam.Op op : GetOpParam.Op.values()) {
boolean expect = (op == GetOpParam.Op.GETDELEGATIONTOKEN);
assertEquals(expect, op.getRequireAuth());
}
}
@Test(timeout=1000)
public void testPutOpRequireAuth() {
for (HttpOpParam.Op op : PutOpParam.Op.values()) {
boolean expect = (op == PutOpParam.Op.RENEWDELEGATIONTOKEN ||
op == PutOpParam.Op.CANCELDELEGATIONTOKEN);
assertEquals(expect, op.getRequireAuth());
}
}
@Test(timeout=1000)
public void testPostOpRequireAuth() {
for (HttpOpParam.Op op : PostOpParam.Op.values()) {
assertFalse(op.getRequireAuth());
}
}
@Test(timeout=1000)
public void testDeleteOpRequireAuth() {
for (HttpOpParam.Op op : DeleteOpParam.Op.values()) {
assertFalse(op.getRequireAuth());
}
}
}

View File

@ -112,7 +112,7 @@ public class TestWebHdfsUrl {
WebHdfsFileSystem webhdfs = getWebHdfsFileSystem(ugi, conf); WebHdfsFileSystem webhdfs = getWebHdfsFileSystem(ugi, conf);
Path fsPath = new Path("/"); Path fsPath = new Path("/");
String tokenString = webhdfs.getRenewToken().encodeToUrlString(); String tokenString = webhdfs.getDelegationToken().encodeToUrlString();
// send user // send user
URL getTokenUrl = webhdfs.toUrl(GetOpParam.Op.GETDELEGATIONTOKEN, fsPath); URL getTokenUrl = webhdfs.toUrl(GetOpParam.Op.GETDELEGATIONTOKEN, fsPath);
@ -193,7 +193,7 @@ public class TestWebHdfsUrl {
WebHdfsFileSystem webhdfs = getWebHdfsFileSystem(ugi, conf); WebHdfsFileSystem webhdfs = getWebHdfsFileSystem(ugi, conf);
Path fsPath = new Path("/"); Path fsPath = new Path("/");
String tokenString = webhdfs.getRenewToken().encodeToUrlString(); String tokenString = webhdfs.getDelegationToken().encodeToUrlString();
// send real+effective // send real+effective
URL getTokenUrl = webhdfs.toUrl(GetOpParam.Op.GETDELEGATIONTOKEN, fsPath); URL getTokenUrl = webhdfs.toUrl(GetOpParam.Op.GETDELEGATIONTOKEN, fsPath);
@ -379,8 +379,5 @@ public class TestWebHdfsUrl {
public int getDefaultPort() { public int getDefaultPort() {
return super.getDefaultPort(); return super.getDefaultPort();
} }
// don't automatically get a token
@Override
protected void initDelegationToken() throws IOException {}
} }
} }

View File

@ -392,6 +392,10 @@ Release 0.23.7 - UNRELEASED
YARN-227. Application expiration difficult to debug for end-users YARN-227. Application expiration difficult to debug for end-users
(Jason Lowe via jeagles) (Jason Lowe via jeagles)
YARN-443. allow OS scheduling priority of NM to be different than the
containers it launches (tgraves)
OPTIMIZATIONS OPTIMIZATIONS

View File

@ -124,10 +124,10 @@ case $startStop in
nohup nice -n $YARN_NICENESS "$HADOOP_YARN_HOME"/bin/yarn --config $YARN_CONF_DIR $command "$@" > "$log" 2>&1 < /dev/null & nohup nice -n $YARN_NICENESS "$HADOOP_YARN_HOME"/bin/yarn --config $YARN_CONF_DIR $command "$@" > "$log" 2>&1 < /dev/null &
echo $! > $pid echo $! > $pid
sleep 1 sleep 1
head "$log"
# capture the ulimit output # capture the ulimit output
echo "ulimit -a" >> $log echo "ulimit -a" >> $log
ulimit -a >> $log 2>&1 ulimit -a >> $log 2>&1
head -30 "$log"
;; ;;
(stop) (stop)

View File

@ -304,6 +304,17 @@ public class YarnConfiguration extends Configuration {
/** who will execute(launch) the containers.*/ /** who will execute(launch) the containers.*/
public static final String NM_CONTAINER_EXECUTOR = public static final String NM_CONTAINER_EXECUTOR =
NM_PREFIX + "container-executor.class"; NM_PREFIX + "container-executor.class";
/**
* Adjustment to make to the container os scheduling priority.
* The valid values for this could vary depending on the platform.
* On Linux, higher values mean run the containers at a less
* favorable priority than the NM.
* The value specified is an int.
*/
public static final String NM_CONTAINER_EXECUTOR_SCHED_PRIORITY =
NM_PREFIX + "container-executor.os.sched.priority.adjustment";
public static final int DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY = 0;
/** Number of threads container manager uses.*/ /** Number of threads container manager uses.*/
public static final String NM_CONTAINER_MGR_THREAD_COUNT = public static final String NM_CONTAINER_MGR_THREAD_COUNT =

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.nodemanager;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -35,6 +37,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader; import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
@ -184,18 +187,39 @@ public abstract class ContainerExecutor implements Configurable {
} }
} }
/** Return a command to execute the given command in OS shell. /**
* Return a command to execute the given command in OS shell.
* On Windows, the passed in groupId can be used to launch * On Windows, the passed in groupId can be used to launch
* and associate the given groupId in a process group. On * and associate the given groupId in a process group. On
* non-Windows, groupId is ignored. */ * non-Windows, groupId is ignored.
protected static String[] getRunCommand(String command, */
String groupId) { protected static String[] getRunCommand(String command, String groupId,
Configuration conf) {
boolean containerSchedPriorityIsSet = false;
int containerSchedPriorityAdjustment =
YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY;
if (conf.get(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY) !=
null) {
containerSchedPriorityIsSet = true;
containerSchedPriorityAdjustment = conf
.getInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY,
YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY);
}
if (Shell.WINDOWS) { if (Shell.WINDOWS) {
return new String[] { Shell.WINUTILS, "task", "create", groupId, return new String[] { Shell.WINUTILS, "task", "create", groupId,
"cmd /c " + command }; "cmd /c " + command };
} else { } else {
return new String[] { "bash", "-c", command }; List<String> retCommand = new ArrayList<String>();
if (containerSchedPriorityIsSet) {
retCommand.addAll(Arrays.asList("nice", "-n",
Integer.toString(containerSchedPriorityAdjustment)));
}
retCommand.addAll(Arrays.asList("bash", "-c", command));
return retCommand.toArray(new String[retCommand.size()]);
} }
} }
/** Return a command for determining if process with specified pid is alive. */ /** Return a command for determining if process with specified pid is alive. */

View File

@ -181,7 +181,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
// Setup command to run // Setup command to run
String[] command = getRunCommand(sb.getWrapperScriptPath().toString(), String[] command = getRunCommand(sb.getWrapperScriptPath().toString(),
containerIdStr); containerIdStr, this.getConf());
LOG.info("launchContainer: " + Arrays.toString(command)); LOG.info("launchContainer: " + Arrays.toString(command));
shExec = new ShellCommandExecutor( shExec = new ShellCommandExecutor(

View File

@ -50,6 +50,8 @@ public class LinuxContainerExecutor extends ContainerExecutor {
private String containerExecutorExe; private String containerExecutorExe;
private LCEResourcesHandler resourcesHandler; private LCEResourcesHandler resourcesHandler;
private boolean containerSchedPriorityIsSet = false;
private int containerSchedPriorityAdjustment = 0;
@Override @Override
@ -61,6 +63,13 @@ public class LinuxContainerExecutor extends ContainerExecutor {
conf.getClass(YarnConfiguration.NM_LINUX_CONTAINER_RESOURCES_HANDLER, conf.getClass(YarnConfiguration.NM_LINUX_CONTAINER_RESOURCES_HANDLER,
DefaultLCEResourcesHandler.class, LCEResourcesHandler.class), conf); DefaultLCEResourcesHandler.class, LCEResourcesHandler.class), conf);
resourcesHandler.setConf(conf); resourcesHandler.setConf(conf);
if (conf.get(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY) != null) {
containerSchedPriorityIsSet = true;
containerSchedPriorityAdjustment = conf
.getInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY,
YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY);
}
} }
/** /**
@ -114,6 +123,13 @@ public class LinuxContainerExecutor extends ContainerExecutor {
: conf.get(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, defaultPath); : conf.get(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, defaultPath);
} }
protected void addSchedPriorityCommand(List<String> command) {
if (containerSchedPriorityIsSet) {
command.addAll(Arrays.asList("nice", "-n",
Integer.toString(containerSchedPriorityAdjustment)));
}
}
@Override @Override
public void init() throws IOException { public void init() throws IOException {
// Send command to executor which will just start up, // Send command to executor which will just start up,
@ -145,14 +161,15 @@ public class LinuxContainerExecutor extends ContainerExecutor {
List<String> localDirs, List<String> logDirs) List<String> localDirs, List<String> logDirs)
throws IOException, InterruptedException { throws IOException, InterruptedException {
List<String> command = new ArrayList<String>( List<String> command = new ArrayList<String>();
Arrays.asList(containerExecutorExe, addSchedPriorityCommand(command);
user, command.addAll(Arrays.asList(containerExecutorExe,
Integer.toString(Commands.INITIALIZE_CONTAINER.getValue()), user,
appId, Integer.toString(Commands.INITIALIZE_CONTAINER.getValue()),
nmPrivateContainerTokensPath.toUri().getPath().toString(), appId,
StringUtils.join(",", localDirs), nmPrivateContainerTokensPath.toUri().getPath().toString(),
StringUtils.join(",", logDirs))); StringUtils.join(",", localDirs),
StringUtils.join(",", logDirs)));
File jvm = // use same jvm as parent File jvm = // use same jvm as parent
new File(new File(System.getProperty("java.home"), "bin"), "java"); new File(new File(System.getProperty("java.home"), "bin"), "java");
@ -212,7 +229,9 @@ public class LinuxContainerExecutor extends ContainerExecutor {
try { try {
Path pidFilePath = getPidFilePath(containerId); Path pidFilePath = getPidFilePath(containerId);
if (pidFilePath != null) { if (pidFilePath != null) {
List<String> command = new ArrayList<String>(Arrays.asList( List<String> command = new ArrayList<String>();
addSchedPriorityCommand(command);
command.addAll(Arrays.asList(
containerExecutorExe, user, Integer containerExecutorExe, user, Integer
.toString(Commands.LAUNCH_CONTAINER.getValue()), appId, .toString(Commands.LAUNCH_CONTAINER.getValue()), appId,
containerIdStr, containerWorkDir.toString(), containerIdStr, containerWorkDir.toString(),

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Test;
import static org.junit.Assert.*;
public class TestContainerExecutor {
@Test (timeout = 5000)
public void testRunCommandNoPriority() throws Exception {
Configuration conf = new Configuration();
String[] command = ContainerExecutor.getRunCommand("echo", "group1", conf);
assertTrue("first command should be the run command for the platform",
command[0].equals(Shell.WINUTILS) || command[0].equals("bash"));
}
@Test (timeout = 5000)
public void testRunCommandwithPriority() throws Exception {
Configuration conf = new Configuration();
conf.setInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY, 2);
String[] command = ContainerExecutor.getRunCommand("echo", "group1", conf);
if (Shell.WINDOWS) {
// windows doesn't currently support
assertEquals("first command should be the run command for the platform",
Shell.WINUTILS, command[0]);
} else {
assertEquals("first command should be nice", "nice", command[0]);
assertEquals("second command should be -n", "-n", command[1]);
assertEquals("third command should be the priority", Integer.toString(2),
command[2]);
}
// test with negative number
conf.setInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY, -5);
command = ContainerExecutor.getRunCommand("echo", "group1", conf);
if (Shell.WINDOWS) {
// windows doesn't currently support
assertEquals("first command should be the run command for the platform",
Shell.WINUTILS, command[0]);
} else {
assertEquals("first command should be nice", "nice", command[0]);
assertEquals("second command should be -n", "-n", command[1]);
assertEquals("third command should be the priority", Integer.toString(-5),
command[2]);
}
}
}

View File

@ -27,6 +27,7 @@ import java.io.FileReader;
import java.io.IOException; import java.io.IOException;
import java.io.LineNumberReader; import java.io.LineNumberReader;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
@ -131,8 +132,41 @@ public class TestLinuxContainerExecutorWithMocks {
} }
@Test (timeout = 5000)
public void testContainerLaunchWithPriority() throws IOException {
// set the scheduler priority to make sure still works with nice -n prio
File f = new File("./src/test/resources/mock-container-executor");
if (!f.canExecute()) {
f.setExecutable(true);
}
String executorPath = f.getAbsolutePath();
Configuration conf = new Configuration();
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, executorPath);
conf.setInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY, 2);
mockExec.setConf(conf);
List<String> command = new ArrayList<String>();
mockExec.addSchedPriorityCommand(command);
assertEquals("first should be nice", "nice", command.get(0));
assertEquals("second should be -n", "-n", command.get(1));
assertEquals("third should be the priority", Integer.toString(2),
command.get(2));
testContainerLaunch();
}
@Test (timeout = 5000)
public void testLaunchCommandWithoutPriority() throws IOException {
// make sure the command doesn't contain the nice -n since priority
// not specified
List<String> command = new ArrayList<String>();
mockExec.addSchedPriorityCommand(command);
assertEquals("addSchedPriority should be empty", 0, command.size());
}
@Test @Test (timeout = 5000)
public void testStartLocalizer() throws IOException { public void testStartLocalizer() throws IOException {