Merging r1524587 through r1525408 from trunk to branch HDFS-2832.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1525410 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2013-09-22 18:23:22 +00:00
commit 5133d78c1a
56 changed files with 1597 additions and 113 deletions

View File

@ -4,8 +4,8 @@ Build instructions for Hadoop
Requirements:
* Unix System
* JDK 1.6
* Maven 3.0
* JDK 1.6+
* Maven 3.0 or later
* Findbugs 1.3.9 (if running findbugs)
* ProtocolBuffer 2.5.0
* CMake 2.6 or newer (if compiling native code)
@ -149,6 +149,21 @@ Create a local staging version of the website (in /tmp/hadoop-site)
----------------------------------------------------------------------------------
Handling out of memory errors in builds
----------------------------------------------------------------------------------
If the build process fails with an out of memory error, you should be able to fix
it by increasing the memory used by maven -which can be done via the environment
variable MAVEN_OPTS.
Here is an example setting to allocate between 256 and 512 MB of heap space to
Maven
export MAVEN_OPTS="-Xms256m -Xmx512m"
----------------------------------------------------------------------------------
Building on OS/X
----------------------------------------------------------------------------------

View File

@ -366,6 +366,12 @@ Release 2.3.0 - UNRELEASED
HADOOP-9350. Hadoop not building against Java7 on OSX
(Robert Kanter via stevel)
HADOOP-9929. Insufficient permissions for a path reported as file not found.
(Contributed by Colin Patrick McCabe)
HADOOP-9791. Add a test case covering long paths for new FileUtil access
check methods (ivanmi)
Release 2.2.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.fs;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@ -50,26 +51,26 @@ class Globber {
this.filter = filter;
}
private FileStatus getFileStatus(Path path) {
private FileStatus getFileStatus(Path path) throws IOException {
try {
if (fs != null) {
return fs.getFileStatus(path);
} else {
return fc.getFileStatus(path);
}
} catch (IOException e) {
} catch (FileNotFoundException e) {
return null;
}
}
private FileStatus[] listStatus(Path path) {
private FileStatus[] listStatus(Path path) throws IOException {
try {
if (fs != null) {
return fs.listStatus(path);
} else {
return fc.util().listStatus(path);
}
} catch (IOException e) {
} catch (FileNotFoundException e) {
return new FileStatus[0];
}
}

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.http;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@ -38,8 +37,7 @@ public class HttpConfig {
CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_DEFAULT);
}
@VisibleForTesting
static void setSecure(boolean secure) {
public static void setSecure(boolean secure) {
sslEnabled = secure;
}

View File

@ -53,6 +53,8 @@ import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* Copy-paste of ClientBase from ZooKeeper, but without any of the
* JMXEnv verification. There seems to be a bug ZOOKEEPER-1438
@ -111,7 +113,9 @@ public abstract class ClientBaseWithFixes extends ZKTestCase {
synchronized boolean isConnected() {
return connected;
}
synchronized void waitForConnected(long timeout) throws InterruptedException, TimeoutException {
@VisibleForTesting
public synchronized void waitForConnected(long timeout)
throws InterruptedException, TimeoutException {
long expire = Time.now() + timeout;
long left = timeout;
while(!connected && left > 0) {
@ -123,7 +127,9 @@ public abstract class ClientBaseWithFixes extends ZKTestCase {
}
}
synchronized void waitForDisconnected(long timeout) throws InterruptedException, TimeoutException {
@VisibleForTesting
public synchronized void waitForDisconnected(long timeout)
throws InterruptedException, TimeoutException {
long expire = Time.now() + timeout;
long left = timeout;
while(connected && left > 0) {

View File

@ -254,6 +254,45 @@ public class TestNativeIO {
File testFile = new File(TEST_DIR, "testfileaccess");
assertTrue(testFile.createNewFile());
// Validate ACCESS_READ
FileUtil.setReadable(testFile, false);
assertFalse(NativeIO.Windows.access(testFile.getAbsolutePath(),
NativeIO.Windows.AccessRight.ACCESS_READ));
FileUtil.setReadable(testFile, true);
assertTrue(NativeIO.Windows.access(testFile.getAbsolutePath(),
NativeIO.Windows.AccessRight.ACCESS_READ));
// Validate ACCESS_WRITE
FileUtil.setWritable(testFile, false);
assertFalse(NativeIO.Windows.access(testFile.getAbsolutePath(),
NativeIO.Windows.AccessRight.ACCESS_WRITE));
FileUtil.setWritable(testFile, true);
assertTrue(NativeIO.Windows.access(testFile.getAbsolutePath(),
NativeIO.Windows.AccessRight.ACCESS_WRITE));
// Validate ACCESS_EXECUTE
FileUtil.setExecutable(testFile, false);
assertFalse(NativeIO.Windows.access(testFile.getAbsolutePath(),
NativeIO.Windows.AccessRight.ACCESS_EXECUTE));
FileUtil.setExecutable(testFile, true);
assertTrue(NativeIO.Windows.access(testFile.getAbsolutePath(),
NativeIO.Windows.AccessRight.ACCESS_EXECUTE));
// Validate that access checks work as expected for long paths
// Assemble a path longer then 260 chars (MAX_PATH)
String testFileRelativePath = "";
for (int i = 0; i < 15; ++i) {
testFileRelativePath += "testfileaccessfolder\\";
}
testFileRelativePath += "testfileaccess";
testFile = new File(TEST_DIR, testFileRelativePath);
assertTrue(testFile.getParentFile().mkdirs());
assertTrue(testFile.createNewFile());
// Validate ACCESS_READ
FileUtil.setReadable(testFile, false);
assertFalse(NativeIO.Windows.access(testFile.getAbsolutePath(),

View File

@ -20,8 +20,8 @@ package org.apache.hadoop.nfs.nfs3;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mount.MountdBase;
import org.apache.hadoop.oncrpc.RpcFrameDecoder;
import org.apache.hadoop.oncrpc.RpcProgram;
import org.apache.hadoop.oncrpc.RpcUtil;
import org.apache.hadoop.oncrpc.SimpleTcpServer;
import org.apache.hadoop.oncrpc.SimpleTcpServerHandler;
import org.apache.hadoop.portmap.PortmapMapping;
@ -68,7 +68,8 @@ public abstract class Nfs3Base {
return new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
return Channels.pipeline(new RpcFrameDecoder(),
return Channels.pipeline(
RpcUtil.constructRpcFrameDecoder(),
new SimpleTcpServerHandler(rpcProgram));
}
};

View File

@ -17,13 +17,65 @@
*/
package org.apache.hadoop.oncrpc;
/**
* The XID in RPC call. It is used for starting with new seed after each reboot.
*/
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
public class RpcUtil {
/**
* The XID in RPC call. It is used for starting with new seed after each reboot.
*/
private static int xid = (int) (System.currentTimeMillis() / 1000) << 12;
public static int getNewXid(String caller) {
return xid = ++xid + caller.hashCode();
}
public static FrameDecoder constructRpcFrameDecoder() {
return new RpcFrameDecoder();
}
static class RpcFrameDecoder extends FrameDecoder {
public static final Log LOG = LogFactory.getLog(RpcFrameDecoder.class);
private ChannelBuffer currentFrame;
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buf) {
if (buf.readableBytes() < 4)
return null;
buf.markReaderIndex();
byte[] fragmentHeader = new byte[4];
buf.readBytes(fragmentHeader);
int length = XDR.fragmentSize(fragmentHeader);
boolean isLast = XDR.isLastFragment(fragmentHeader);
if (buf.readableBytes() < length) {
buf.resetReaderIndex();
return null;
}
ChannelBuffer newFragment = buf.readSlice(length);
if (currentFrame == null) {
currentFrame = newFragment;
} else {
currentFrame = ChannelBuffers.wrappedBuffer(currentFrame, newFragment);
}
if (isLast) {
ChannelBuffer completeFrame = currentFrame;
currentFrame = null;
return completeFrame;
} else {
return null;
}
}
}
}

View File

@ -20,8 +20,6 @@ package org.apache.hadoop.oncrpc;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import org.apache.hadoop.oncrpc.RpcFrameDecoder;
import org.apache.hadoop.oncrpc.XDR;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
@ -55,7 +53,8 @@ public class SimpleTcpClient {
this.pipelineFactory = new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
return Channels.pipeline(new RpcFrameDecoder(),
return Channels.pipeline(
RpcUtil.constructRpcFrameDecoder(),
new SimpleTcpClientHandler(request));
}
};

View File

@ -57,7 +57,8 @@ public class SimpleTcpServer {
return new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
return Channels.pipeline(new RpcFrameDecoder(),
return Channels.pipeline(
RpcUtil.constructRpcFrameDecoder(),
new SimpleTcpServerHandler(rpcProgram));
}
};

View File

@ -44,7 +44,7 @@ public class SimpleTcpServerHandler extends SimpleChannelHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
ChannelBuffer buf = (ChannelBuffer) e.getMessage();
XDR request = new XDR(buf.array());
XDR request = new XDR(buf.toByteBuffer().asReadOnlyBuffer(), XDR.State.READING);
InetAddress remoteInetAddr = ((InetSocketAddress) ctx.getChannel()
.getRemoteAddress()).getAddress();

View File

@ -43,7 +43,7 @@ public class SimpleUdpServerHandler extends SimpleChannelHandler {
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
ChannelBuffer buf = (ChannelBuffer) e.getMessage();
XDR request = new XDR(buf.array());
XDR request = new XDR(buf.toByteBuffer().asReadOnlyBuffer(), XDR.State.READING);
InetAddress remoteInetAddr = ((InetSocketAddress) e.getRemoteAddress())
.getAddress();

View File

@ -46,7 +46,7 @@ public final class XDR {
private ByteBuffer buf;
private enum State {
public enum State {
READING, WRITING,
}
@ -66,7 +66,7 @@ public final class XDR {
this(DEFAULT_INITIAL_CAPACITY);
}
private XDR(ByteBuffer buf, State state) {
public XDR(ByteBuffer buf, State state) {
this.buf = buf;
this.state = state;
}

View File

@ -18,12 +18,14 @@
package org.apache.hadoop.oncrpc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import org.apache.hadoop.oncrpc.RpcUtil.RpcFrameDecoder;
import org.apache.hadoop.oncrpc.security.CredentialsNone;
import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer;
@ -138,7 +140,7 @@ public class TestFrameDecoder {
buf);
assertTrue(channelBuffer != null);
// Complete frame should have to total size 10+10=20
assertTrue(channelBuffer.array().length == 20);
assertEquals(20, channelBuffer.readableBytes());
}
@Test
@ -195,4 +197,4 @@ public class TestFrameDecoder {
* static void testDump() { XDR xdr_out = new XDR();
* createPortmapXDRheader(xdr_out, 4); testRequest(xdr_out); }
*/
}
}

View File

@ -17,10 +17,9 @@
*/
package org.apache.hadoop.oncrpc;
import org.junit.Assert;
import org.junit.Test;
import junit.framework.Assert;
public class TestXDR {
private void serializeInt(int times) {
XDR w = new XDR();

View File

@ -33,8 +33,8 @@ import org.apache.hadoop.nfs.nfs3.request.SetAttr3;
import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
import org.apache.hadoop.oncrpc.RegistrationClient;
import org.apache.hadoop.oncrpc.RpcCall;
import org.apache.hadoop.oncrpc.RpcFrameDecoder;
import org.apache.hadoop.oncrpc.RpcReply;
import org.apache.hadoop.oncrpc.RpcUtil;
import org.apache.hadoop.oncrpc.SimpleTcpClient;
import org.apache.hadoop.oncrpc.SimpleTcpClientHandler;
import org.apache.hadoop.oncrpc.XDR;
@ -136,8 +136,9 @@ public class TestOutOfOrderWrite {
protected ChannelPipelineFactory setPipelineFactory() {
this.pipelineFactory = new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
return Channels.pipeline(new RpcFrameDecoder(), new WriteHandler(
request));
return Channels.pipeline(
RpcUtil.constructRpcFrameDecoder(),
new WriteHandler(request));
}
};
return this.pipelineFactory;

View File

@ -366,6 +366,9 @@ Release 2.1.1-beta - 2013-09-23
HDFS-5199 Add more debug trace for NFS READ and WRITE. (brandonli)
HDFS-5234 Move RpcFrameDecoder out of the public API.
(Haohui Mai via brandonli)
IMPROVEMENTS
HDFS-4513. Clarify in the WebHDFS REST API that all JSON respsonses may
@ -479,6 +482,9 @@ Release 2.1.1-beta - 2013-09-23
HDFS-5219. Add configuration keys for retry policy in WebHDFSFileSystem.
(Haohui Mai via jing9)
HDFS-5231. Fix broken links in the document of HDFS Federation. (Haohui Mai
via jing9)
Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES

View File

@ -28,7 +28,7 @@ HDFS Federation
* {Background}
[./federation-background.gif] HDFS Layers
[./images/federation-background.gif] HDFS Layers
HDFS has two main layers:
@ -72,7 +72,7 @@ HDFS Federation
send periodic heartbeats and block reports and handles commands from the
Namenodes.
[./federation.gif] HDFS Federation Architecture
[./images/federation.gif] HDFS Federation Architecture
<<Block Pool>>

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.fs;
import static org.junit.Assert.*;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
@ -27,10 +28,15 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.*;
public class TestGlobPaths {
private static final UserGroupInformation unprivilegedUser =
UserGroupInformation.createRemoteUser("myuser");
static class RegexPathFilter implements PathFilter {
private final String regex;
@ -47,17 +53,33 @@ public class TestGlobPaths {
static private MiniDFSCluster dfsCluster;
static private FileSystem fs;
static private FileSystem unprivilegedFs;
static private FileContext fc;
static private FileContext unprivilegedFc;
static final private int NUM_OF_PATHS = 4;
static private String USER_DIR;
private Path[] path = new Path[NUM_OF_PATHS];
@BeforeClass
public static void setUp() throws Exception {
Configuration conf = new HdfsConfiguration();
final Configuration conf = new HdfsConfiguration();
dfsCluster = new MiniDFSCluster.Builder(conf).build();
fs = FileSystem.get(conf);
unprivilegedFs =
unprivilegedUser.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override
public FileSystem run() throws IOException {
return FileSystem.get(conf);
}
});
fc = FileContext.getFileContext(conf);
unprivilegedFc =
unprivilegedUser.doAs(new PrivilegedExceptionAction<FileContext>() {
@Override
public FileContext run() throws IOException {
return FileContext.getFileContext(conf);
}
});
USER_DIR = fs.getHomeDirectory().toUri().getPath().toString();
}
@ -781,8 +803,8 @@ public class TestGlobPaths {
* A glob test that can be run on either FileContext or FileSystem.
*/
private static interface FSTestWrapperGlobTest {
void run(FSTestWrapper wrap, FileSystem fs, FileContext fc)
throws Exception;
void run(FSTestWrapper wrap, FSTestWrapper unprivilegedWrapper,
FileSystem fs, FileContext fc) throws Exception;
}
/**
@ -791,7 +813,8 @@ public class TestGlobPaths {
private void testOnFileSystem(FSTestWrapperGlobTest test) throws Exception {
try {
fc.mkdir(new Path(USER_DIR), FsPermission.getDefault(), true);
test.run(new FileSystemTestWrapper(fs), fs, null);
test.run(new FileSystemTestWrapper(fs),
new FileSystemTestWrapper(unprivilegedFs), fs, null);
} finally {
fc.delete(new Path(USER_DIR), true);
}
@ -803,7 +826,8 @@ public class TestGlobPaths {
private void testOnFileContext(FSTestWrapperGlobTest test) throws Exception {
try {
fs.mkdirs(new Path(USER_DIR));
test.run(new FileContextTestWrapper(fc), null, fc);
test.run(new FileContextTestWrapper(fc),
new FileContextTestWrapper(unprivilegedFc), null, fc);
} finally {
cleanupDFS();
}
@ -834,8 +858,8 @@ public class TestGlobPaths {
* Test globbing through symlinks.
*/
private static class TestGlobWithSymlinks implements FSTestWrapperGlobTest {
public void run(FSTestWrapper wrap, FileSystem fs, FileContext fc)
throws Exception {
public void run(FSTestWrapper wrap, FSTestWrapper unprivilegedWrap,
FileSystem fs, FileContext fc) throws Exception {
// Test that globbing through a symlink to a directory yields a path
// containing that symlink.
wrap.mkdir(new Path(USER_DIR + "/alpha"), FsPermission.getDirDefault(),
@ -886,8 +910,8 @@ public class TestGlobPaths {
*/
private static class TestGlobWithSymlinksToSymlinks implements
FSTestWrapperGlobTest {
public void run(FSTestWrapper wrap, FileSystem fs, FileContext fc)
throws Exception {
public void run(FSTestWrapper wrap, FSTestWrapper unprivilegedWrap,
FileSystem fs, FileContext fc) throws Exception {
// Test that globbing through a symlink to a symlink to a directory
// fully resolves
wrap.mkdir(new Path(USER_DIR + "/alpha"), FsPermission.getDirDefault(),
@ -961,8 +985,8 @@ public class TestGlobPaths {
*/
private static class TestGlobSymlinksWithCustomPathFilter implements
FSTestWrapperGlobTest {
public void run(FSTestWrapper wrap, FileSystem fs, FileContext fc)
throws Exception {
public void run(FSTestWrapper wrap, FSTestWrapper unprivilegedWrap,
FileSystem fs, FileContext fc) throws Exception {
// Test that globbing through a symlink to a symlink to a directory
// fully resolves
wrap.mkdir(new Path(USER_DIR + "/alpha"), FsPermission.getDirDefault(),
@ -1009,8 +1033,8 @@ public class TestGlobPaths {
* Test that globStatus fills in the scheme even when it is not provided.
*/
private static class TestGlobFillsInScheme implements FSTestWrapperGlobTest {
public void run(FSTestWrapper wrap, FileSystem fs, FileContext fc)
throws Exception {
public void run(FSTestWrapper wrap, FSTestWrapper unprivilegedWrap,
FileSystem fs, FileContext fc) throws Exception {
// Verify that the default scheme is hdfs, when we don't supply one.
wrap.mkdir(new Path(USER_DIR + "/alpha"), FsPermission.getDirDefault(),
false);
@ -1052,8 +1076,8 @@ public class TestGlobPaths {
* Test that globStatus works with relative paths.
**/
private static class TestRelativePath implements FSTestWrapperGlobTest {
public void run(FSTestWrapper wrap, FileSystem fs, FileContext fc)
throws Exception {
public void run(FSTestWrapper wrap, FSTestWrapper unprivilegedWrap,
FileSystem fs, FileContext fc) throws Exception {
String[] files = new String[] { "a", "abc", "abc.p", "bacd" };
Path[] path = new Path[files.length];
@ -1086,4 +1110,44 @@ public class TestGlobPaths {
public void testRelativePathOnFC() throws Exception {
testOnFileContext(new TestRelativePath());
}
/**
* Test that trying to glob through a directory we don't have permission
* to list fails with AccessControlException rather than succeeding or
* throwing any other exception.
**/
private static class TestGlobAccessDenied implements FSTestWrapperGlobTest {
public void run(FSTestWrapper wrap, FSTestWrapper unprivilegedWrap,
FileSystem fs, FileContext fc) throws Exception {
wrap.mkdir(new Path("/nopermission/val"),
new FsPermission((short)0777), true);
wrap.mkdir(new Path("/norestrictions/val"),
new FsPermission((short)0777), true);
wrap.setPermission(new Path("/nopermission"),
new FsPermission((short)0));
try {
unprivilegedWrap.globStatus(new Path("/no*/*"),
new AcceptAllPathFilter());
Assert.fail("expected to get an AccessControlException when " +
"globbing through a directory we don't have permissions " +
"to list.");
} catch (AccessControlException ioe) {
}
Assert.assertEquals("/norestrictions/val",
TestPath.mergeStatuses(unprivilegedWrap.globStatus(
new Path("/norestrictions/*"),
new AcceptAllPathFilter())));
}
}
@Test
public void testGlobAccessDeniedOnFS() throws Exception {
testOnFileSystem(new TestGlobAccessDenied());
}
@Test
public void testGlobAccessDeniedOnFC() throws Exception {
testOnFileContext(new TestGlobAccessDenied());
}
}

View File

@ -190,6 +190,17 @@ Release 2.2.0 - UNRELEASED
BUG FIXES
MAPREDUCE-5504. mapred queue -info inconsistent with types (Kousuke Saruta
via tgraves)
MAPREDUCE-5488. Changed MR client to keep trying to reach the application
when it sees that on attempt's AM is down. (Jian He via vinodkv)
MAPREDUCE-5515. Fixed MR AM's webapp to depend on a new config
mapreduce.ssl.enabled to enable https and disabling it by default as MR AM
needs to set up its own certificates etc and not depend on clusters'.
(Omkar Vinit Joshi via vinodkv)
Release 2.1.1-beta - 2013-09-23
INCOMPATIBLE CHANGES
@ -1365,6 +1376,9 @@ Release 0.23.10 - UNRELEASED
MAPREDUCE-5475. MRClientService does not verify ACLs properly (jlowe)
MAPREDUCE-5504. mapred queue -info inconsistent with types (Kousuke Saruta
via tgraves)
Release 0.23.9 - 2013-07-08
INCOMPATIBLE CHANGES

View File

@ -496,6 +496,12 @@
<Field name="sslFileBufferSize" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<Match>
<Class name="org.apache.hadoop.mapred.ClientServiceDelegate" />
<Method name="invoke" />
<Bug pattern="SWL_SLEEP_WITH_LOCK_HELD" />
</Match>
<Match>
<Class name="org.apache.hadoop.mapreduce.util.ProcessTree" />

View File

@ -36,14 +36,17 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LocalContainerLauncher;
import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
@ -101,6 +104,7 @@ import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
import org.apache.hadoop.mapreduce.v2.app.webapp.WebAppUtil;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
@ -1313,6 +1317,7 @@ public class MRAppMaster extends CompositeService {
containerId.getApplicationAttemptId();
long appSubmitTime = Long.parseLong(appSubmitTimeStr);
MRAppMaster appMaster =
new MRAppMaster(applicationAttemptId, containerId, nodeHostString,
Integer.parseInt(nodePortString),
@ -1322,6 +1327,16 @@ public class MRAppMaster extends CompositeService {
new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
JobConf conf = new JobConf(new YarnConfiguration());
conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
// Explicitly disabling SSL for map reduce task as we can't allow MR users
// to gain access to keystore file for opening SSL listener. We can trust
// RM/NM to issue SSL certificates but definitely not MR-AM as it is
// running in user-land.
HttpConfig.setSecure(conf.getBoolean(MRConfig.SSL_ENABLED_KEY,
MRConfig.SSL_ENABLED_KEY_DEFAULT));
WebAppUtil.setSSLEnabledInYARN(conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_KEY,
CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_DEFAULT));
// log the system properties
String systemPropsToLog = MRApps.getSystemPropertiesToLog(conf);

View File

@ -27,8 +27,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
@ -78,6 +80,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
import org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp;
import org.apache.hadoop.mapreduce.v2.app.webapp.WebAppUtil;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;

View File

@ -28,7 +28,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@ -36,12 +38,10 @@ import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app.webapp.WebAppUtil;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@ -58,8 +58,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import com.sun.research.ws.wadl.Response;
/**
* Registers/unregisters to RM and sends heartbeats to RM.
*/
@ -148,7 +146,13 @@ public abstract class RMCommunicator extends AbstractService
if (serviceAddr != null) {
request.setHost(serviceAddr.getHostName());
request.setRpcPort(serviceAddr.getPort());
request.setTrackingUrl(serviceAddr.getHostName() + ":" + clientService.getHttpPort());
String scheme = "http://";
if (getConfig().getBoolean(MRConfig.SSL_ENABLED_KEY,
MRConfig.SSL_ENABLED_KEY_DEFAULT)) {
scheme = "https://";
}
request.setTrackingUrl(scheme + serviceAddr.getHostName() + ":"
+ clientService.getHttpPort());
}
RegisterApplicationMasterResponse response =
scheduler.registerApplicationMaster(request);
@ -190,10 +194,11 @@ public abstract class RMCommunicator extends AbstractService
}
LOG.info("Setting job diagnostics to " + sb.toString());
String historyUrl = JobHistoryUtils.getHistoryUrl(getConfig(),
context.getApplicationID());
String historyUrl =
WebAppUtil.getSchemePrefix()
+ JobHistoryUtils.getHistoryUrl(getConfig(),
context.getApplicationID());
LOG.info("History url is " + historyUrl);
FinishApplicationMasterRequest request =
FinishApplicationMasterRequest.newInstance(finishState,
sb.toString(), historyUrl);

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.Controller;
import org.apache.hadoop.yarn.webapp.View;
import com.google.common.base.Joiner;
import com.google.inject.Inject;
/**
@ -50,6 +51,7 @@ import com.google.inject.Inject;
*/
public class AppController extends Controller implements AMParams {
private static final Log LOG = LogFactory.getLog(AppController.class);
private static final Joiner JOINER = Joiner.on("");
protected final App app;
@ -58,7 +60,9 @@ public class AppController extends Controller implements AMParams {
super(ctx);
this.app = app;
set(APP_ID, app.context.getApplicationID().toString());
set(RM_WEB, YarnConfiguration.getRMWebAppURL(conf));
set(RM_WEB,
JOINER.join(WebAppUtil.getSchemePrefix(),
YarnConfiguration.getRMWebAppHostAndPort(conf)));
}
@Inject

View File

@ -104,7 +104,7 @@ public class JobBlock extends HtmlBlock {
table.tr().
td(String.valueOf(attempt.getAttemptId())).
td(new Date(attempt.getStartTime()).toString()).
td().a(".nodelink", url(HttpConfig.getSchemePrefix(),
td().a(".nodelink", url(WebAppUtil.getSchemePrefix(),
attempt.getNodeHttpAddress()),
attempt.getNodeHttpAddress())._().
td().a(".logslink", url(attempt.getLogsLink()),

View File

@ -63,7 +63,7 @@ public class NavBlock extends HtmlBlock {
li().a(url("conf", jobid), "Configuration")._().
li().a(url("tasks", jobid, "m"), "Map tasks")._().
li().a(url("tasks", jobid, "r"), "Reduce tasks")._().
li().a(".logslink", url(HttpConfig.getSchemePrefix(),
li().a(".logslink", url(WebAppUtil.getSchemePrefix(),
nodeHttpAddress, "node",
"containerlogs", thisAmInfo.getContainerId().toString(),
app.getJob().getUserName()),

View File

@ -86,12 +86,12 @@ public class TaskPage extends AppView {
.append(ta.getState().toString()).append("\",\"")
.append(nodeHttpAddr == null ? "N/A" :
"<a class='nodelink' href='" + HttpConfig.getSchemePrefix() + nodeHttpAddr + "'>"
"<a class='nodelink' href='" + WebAppUtil.getSchemePrefix() + nodeHttpAddr + "'>"
+ nodeHttpAddr + "</a>")
.append("\",\"")
.append(ta.getAssignedContainerId() == null ? "N/A" :
"<a class='logslink' href='" + url(HttpConfig.getSchemePrefix(), nodeHttpAddr, "node"
"<a class='logslink' href='" + url(WebAppUtil.getSchemePrefix(), nodeHttpAddr, "node"
, "containerlogs", ta.getAssignedContainerIdStr(), app.getJob()
.getUserName()) + "'>logs</a>")
.append("\",\"")

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.webapp;
public class WebAppUtil {
private static boolean isSSLEnabledInYARN;
public static void setSSLEnabledInYARN(boolean isSSLEnabledInYARN) {
WebAppUtil.isSSLEnabledInYARN = isSSLEnabledInYARN;
}
public static boolean isSSLEnabledInYARN() {
return isSSLEnabledInYARN;
}
public static String getSchemePrefix() {
if (isSSLEnabledInYARN) {
return "https://";
} else {
return "http://";
}
}
}

View File

@ -26,6 +26,7 @@ import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.WebAppUtil;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -63,7 +64,7 @@ public class AMAttemptInfo {
ContainerId containerId = amInfo.getContainerId();
if (containerId != null) {
this.containerId = containerId.toString();
this.logsLink = join(HttpConfig.getSchemePrefix() + nodeHttpAddress,
this.logsLink = join(WebAppUtil.getSchemePrefix() + nodeHttpAddress,
ujoin("node", "containerlogs", this.containerId, user));
}
}

View File

@ -469,7 +469,7 @@ public class TypeConverter {
QueueInfo toReturn = new QueueInfo(queueInfo.getQueueName(), "Capacity: " +
queueInfo.getCapacity() * 100 + ", MaximumCapacity: " +
(queueInfo.getMaximumCapacity() < 0 ? "UNDEFINED" :
queueInfo.getMaximumCapacity()) + ", CurrentCapacity: " +
queueInfo.getMaximumCapacity() * 100) + ", CurrentCapacity: " +
queueInfo.getCurrentCapacity() * 100, fromYarn(queueInfo.getQueueState()),
TypeConverter.fromYarnApps(queueInfo.getApplications(), conf));
List<QueueInfo> childQueues = new ArrayList<QueueInfo>();

View File

@ -84,6 +84,11 @@ public interface MRConfig {
"mapreduce.shuffle.ssl.enabled";
public static final boolean SHUFFLE_SSL_ENABLED_DEFAULT = false;
public static final String SSL_ENABLED_KEY =
"mapreduce.ssl.enabled";
public static final boolean SSL_ENABLED_KEY_DEFAULT = false;
public static final String SHUFFLE_CONSUMER_PLUGIN =
"mapreduce.job.reduce.shuffle.consumer.plugin.class";

View File

@ -357,7 +357,7 @@ public interface MRJobConfig {
public static final int DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES = 3;
/**
* The number of client retries to the RM/HS/AM before throwing exception.
* The number of client retries to the RM/HS before throwing exception.
*/
public static final String MR_CLIENT_MAX_RETRIES =
MR_PREFIX + "client.max-retries";

View File

@ -289,6 +289,20 @@
</description>
</property>
<property>
<name>mapreduce.ssl.enabled</name>
<value>false</value>
<description>
If enabled, MapReduce application master's http server will be
started with SSL enabled. Map reduce AM by default doesn't support SSL.
If MapReduce jobs want SSL support, it is the user's responsibility to
create and manage certificates, keystores and trust-stores with appropriate
permissions. This is only for MapReduce application master and is not used
by job history server. To enable encrypted shuffle this property is not
required, instead refer to (mapreduce.shuffle.ssl.enabled) property.
</description>
</property>
<property>
<name>mapreduce.shuffle.ssl.file.buffer.size</name>
<value>65536</value>
@ -982,7 +996,7 @@
<property>
<name>yarn.app.mapreduce.client-am.ipc.max-retries</name>
<value>1</value>
<value>3</value>
<description>The number of client retries to the AM - before reconnecting
to the RM to fetch Application Status.</description>
</property>
@ -990,7 +1004,7 @@
<property>
<name>yarn.app.mapreduce.client.max-retries</name>
<value>3</value>
<description>The number of client retries to the RM/HS/AM before
<description>The number of client retries to the RM/HS before
throwing exception. This is a layer above the ipc.</description>
</property>

View File

@ -24,8 +24,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.v2.app.webapp.WebAppUtil;
import org.apache.hadoop.mapreduce.v2.hs.server.HSAdminServer;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@ -73,6 +75,10 @@ public class JobHistoryServer extends CompositeService {
config.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
// This is required for WebApps to use https if enabled.
WebAppUtil.setSSLEnabledInYARN(conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_KEY,
CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_DEFAULT));
try {
doSecureLogin(conf);
} catch(IOException ie) {

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.webapp.WebAppUtil;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.ConfEntryInfo;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.AMAttemptInfo;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobInfo;
@ -132,7 +133,7 @@ public class HsJobBlock extends HtmlBlock {
table.tr((odd = !odd) ? _ODD : _EVEN).
td(String.valueOf(attempt.getAttemptId())).
td(new Date(attempt.getStartTime()).toString()).
td().a(".nodelink", url(HttpConfig.getSchemePrefix(),
td().a(".nodelink", url(WebAppUtil.getSchemePrefix(),
attempt.getNodeHttpAddress()),
attempt.getNodeHttpAddress())._().
td().a(".logslink", url(attempt.getShortLogsLink()),

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.webapp.App;
import org.apache.hadoop.mapreduce.v2.app.webapp.WebAppUtil;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.util.Times;
@ -148,7 +149,7 @@ public class HsTaskPage extends HsView {
.append(sortId + " ").append(taid).append("\",\"")
.append(ta.getState().toString()).append("\",\"")
.append("<a class='nodelink' href='" + HttpConfig.getSchemePrefix() + nodeHttpAddr + "'>")
.append("<a class='nodelink' href='" + WebAppUtil.getSchemePrefix() + nodeHttpAddr + "'>")
.append(nodeRackName + "/" + nodeHttpAddr + "</a>\",\"")
.append("<a class='logslink' href='").append(url("logs", nodeIdString

View File

@ -26,6 +26,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@ -77,6 +78,8 @@ import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting;
public class ClientServiceDelegate {
private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
private static final String UNAVAILABLE = "N/A";
@ -93,7 +96,8 @@ public class ClientServiceDelegate {
private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private static String UNKNOWN_USER = "Unknown User";
private String trackingUrl;
private AtomicBoolean usingAMProxy = new AtomicBoolean(false);
private int maxClientRetry;
private boolean amAclDisabledStatusLogged = false;
public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm,
@ -287,6 +291,7 @@ public class ClientServiceDelegate {
MRClientProtocol proxy =
(MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
serviceAddr, conf);
usingAMProxy.set(true);
LOG.trace("Connected to ApplicationMaster at: " + serviceAddr);
return proxy;
}
@ -301,13 +306,15 @@ public class ClientServiceDelegate {
} catch (NoSuchMethodException e) {
throw new YarnRuntimeException("Method name mismatch", e);
}
int maxRetries = this.conf.getInt(
maxClientRetry = this.conf.getInt(
MRJobConfig.MR_CLIENT_MAX_RETRIES,
MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES);
IOException lastException = null;
while (maxRetries > 0) {
while (maxClientRetry > 0) {
MRClientProtocol MRClientProxy = null;
try {
return methodOb.invoke(getProxy(), args);
MRClientProxy = getProxy();
return methodOb.invoke(MRClientProxy, args);
} catch (InvocationTargetException e) {
// Will not throw out YarnException anymore
LOG.debug("Failed to contact AM/History for job " + jobId +
@ -315,22 +322,44 @@ public class ClientServiceDelegate {
// Force reconnection by setting the proxy to null.
realProxy = null;
// HS/AMS shut down
maxRetries--;
// if it's AM shut down, do not decrement maxClientRetry as we wait for
// AM to be restarted.
if (!usingAMProxy.get()) {
maxClientRetry--;
}
usingAMProxy.set(false);
lastException = new IOException(e.getTargetException());
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
LOG.warn("ClientServiceDelegate invoke call interrupted", ie);
throw new YarnRuntimeException(ie);
}
} catch (Exception e) {
LOG.debug("Failed to contact AM/History for job " + jobId
+ " Will retry..", e);
// Force reconnection by setting the proxy to null.
realProxy = null;
// RM shutdown
maxRetries--;
lastException = new IOException(e.getMessage());
maxClientRetry--;
lastException = new IOException(e.getMessage());
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
LOG.warn("ClientServiceDelegate invoke call interrupted", ie);
throw new YarnRuntimeException(ie);
}
}
}
throw lastException;
}
// Only for testing
@VisibleForTesting
public int getMaxClientRetry() {
return this.maxClientRetry;
}
public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
InterruptedException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0);

View File

@ -140,6 +140,48 @@ public class TestClientServiceDelegate {
any(GetJobReportRequest.class));
}
@Test
public void testRetriesOnAMConnectionFailures() throws Exception {
if (!isAMReachableFromClient) {
return;
}
ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
.thenReturn(getRunningApplicationReport("am1", 78));
// throw exception in 1st, 2nd, 3rd and 4th call of getJobReport, and
// succeed in the 5th call.
final MRClientProtocol amProxy = mock(MRClientProtocol.class);
when(amProxy.getJobReport(any(GetJobReportRequest.class)))
.thenThrow(new RuntimeException("11"))
.thenThrow(new RuntimeException("22"))
.thenThrow(new RuntimeException("33"))
.thenThrow(new RuntimeException("44")).thenReturn(getJobReportResponse());
Configuration conf = new YarnConfiguration();
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED,
!isAMReachableFromClient);
ClientServiceDelegate clientServiceDelegate =
new ClientServiceDelegate(conf, rm, oldJobId, null) {
@Override
MRClientProtocol instantiateAMProxy(
final InetSocketAddress serviceAddr) throws IOException {
super.instantiateAMProxy(serviceAddr);
return amProxy;
}
};
JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
Assert.assertNotNull(jobStatus);
// assert maxClientRetry is not decremented.
Assert.assertEquals(conf.getInt(MRJobConfig.MR_CLIENT_MAX_RETRIES,
MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES), clientServiceDelegate
.getMaxClientRetry());
verify(amProxy, times(5)).getJobReport(any(GetJobReportRequest.class));
}
@Test
public void testHistoryServerNotConfigured() throws Exception {
//RM doesn't have app report and job History Server is not configured

View File

@ -27,10 +27,15 @@ Release 2.3.0 - UNRELEASED
IMPROVEMENTS
YARN-905. Add state filters to nodes CLI (Wei Yan via Sandy Ryza)
YARN-1098. Separate out RM services into Always On and Active (Karthik
Kambatla via bikas)
YARN-1027. Implement RMHAProtocolService (Karthik Kambatla via bikas)
YARN-353. Add Zookeeper-based store implementation for RMStateStore.
(Bikas Saha, Jian He and Karthik Kambatla via hitesh)
OPTIMIZATIONS
BUG FIXES
@ -50,6 +55,9 @@ Release 2.2.0 - UNRELEASED
BUG FIXES
YARN-1128. FifoPolicy.computeShares throws NPE on empty list of Schedulables
(Karthik Kambatla via Sandy Ryza)
Release 2.1.1-beta - 2013-09-23
INCOMPATIBLE CHANGES
@ -121,6 +129,9 @@ Release 2.1.1-beta - 2013-09-23
YARN-1001. Added a web-service to get statistics about per application-type
per state for consumption by downstream projects. (Zhijie Shen via vinodkv)
YARN-1203. Changed YARN web-app proxy to handle http and https URLs from
AM registration and finish correctly. (Omkar Vinit Joshi via vinodkv)
OPTIMIZATIONS
BUG FIXES

View File

@ -91,6 +91,8 @@ public abstract class FinishApplicationMasterRequest {
/**
* Get the <em>tracking URL</em> for the <code>ApplicationMaster</code>.
* This url if contains scheme then that will be used by resource manager
* web application proxy otherwise it will default to http.
* @return <em>tracking URL</em>for the <code>ApplicationMaster</code>
*/
@Public
@ -99,6 +101,8 @@ public abstract class FinishApplicationMasterRequest {
/**
* Set the <em>tracking URL</em>for the <code>ApplicationMaster</code>
* This url if contains scheme then that will be used by resource manager
* web application proxy otherwise it will default to http.
* @param url <em>tracking URL</em>for the
* <code>ApplicationMaster</code>
*/

View File

@ -103,6 +103,8 @@ public abstract class RegisterApplicationMasterRequest {
/**
* Get the <em>tracking URL</em> for the <code>ApplicationMaster</code>.
* This url if contains scheme then that will be used by resource manager
* web application proxy otherwise it will default to http.
* @return <em>tracking URL</em> for the <code>ApplicationMaster</code>
*/
@Public
@ -111,6 +113,8 @@ public abstract class RegisterApplicationMasterRequest {
/**
* Set the <em>tracking URL</em> for the <code>ApplicationMaster</code>.
* This url if contains scheme then that will be used by resource manager
* web application proxy otherwise it will default to http.
* @param trackingUrl <em>tracking URL</em> for the
* <code>ApplicationMaster</code>
*/

View File

@ -26,6 +26,8 @@ import java.util.Arrays;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
@ -276,12 +278,40 @@ public class YarnConfiguration extends Configuration {
public static final String RM_HA_ENABLED = RM_HA_PREFIX + "enabled";
public static final boolean DEFAULT_RM_HA_ENABLED = false;
////////////////////////////////
// RM state store configs
////////////////////////////////
/** The class to use as the persistent store.*/
public static final String RM_STORE = RM_PREFIX + "store.class";
/** URI for FileSystemRMStateStore */
public static final String FS_RM_STATE_STORE_URI =
RM_PREFIX + "fs.state-store.uri";
/**
* Comma separated host:port pairs, each corresponding to a ZK server for
* ZKRMStateStore
*/
public static final String ZK_STATE_STORE_PREFIX =
RM_PREFIX + "zk.state-store.";
public static final String ZK_RM_STATE_STORE_NUM_RETRIES =
ZK_STATE_STORE_PREFIX + "num-retries";
public static final int DEFAULT_ZK_RM_STATE_STORE_NUM_RETRIES = 3;
public static final String ZK_RM_STATE_STORE_ADDRESS =
ZK_STATE_STORE_PREFIX + "address";
/** Timeout in millisec for ZK server connection for ZKRMStateStore */
public static final String ZK_RM_STATE_STORE_TIMEOUT_MS =
ZK_STATE_STORE_PREFIX + "timeout.ms";
public static final int DEFAULT_ZK_RM_STATE_STORE_TIMEOUT_MS = 60000;
/** Parent znode path under which ZKRMStateStore will create znodes */
public static final String ZK_RM_STATE_STORE_PARENT_PATH =
ZK_STATE_STORE_PREFIX + "parent-path";
public static final String DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH = "/rmstore";
/** ACL for znodes in ZKRMStateStore */
public static final String ZK_RM_STATE_STORE_ACL =
ZK_STATE_STORE_PREFIX + "acl";
public static final String DEFAULT_ZK_RM_STATE_STORE_ACL =
"world:anyone:rwcda";
/** The maximum number of completed applications RM keeps. */
public static final String RM_MAX_COMPLETED_APPLICATIONS =
@ -837,7 +867,8 @@ public class YarnConfiguration extends Configuration {
}
public static String getRMWebAppURL(Configuration conf) {
return JOINER.join("http://", getRMWebAppHostAndPort(conf));
return JOINER.join(HttpConfig.getSchemePrefix(),
getRMWebAppHostAndPort(conf));
}
}

View File

@ -258,6 +258,51 @@
<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore</value>
</property>
<property>
<description>Host:Port of the ZooKeeper server where RM state will
be stored. This must be supplied when using
org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
as the value for yarn.resourcemanager.store.class</description>
<name>yarn.resourcemanager.zk.state-store.address</name>
<!--value>127.0.0.1:2181</value-->
</property>
<property>
<description>Number of times ZKRMStateStore tries to connect to
ZooKeeper. This may be supplied when using
org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
as the value for yarn.resourcemanager.store.class</description>
<name>yarn.resourcemanager.zk.state-store.num-retries</name>
<value>3</value>
</property>
<property>
<description>Full path of the ZooKeeper znode where RM state will be
stored. This must be supplied when using
org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
as the value for yarn.resourcemanager.store.class</description>
<name>yarn.resourcemanager.zk.state-store.parent-path</name>
<value>/rmstore</value>
</property>
<property>
<description>Timeout when connecting to ZooKeeper.
This may be supplied when using
org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
as the value for yarn.resourcemanager.store.class</description>
<name>yarn.resourcemanager.zk.state-store.timeout.ms</name>
<value>60000</value>
</property>
<property>
<description>ACL's to be used for ZooKeeper znodes.
This may be supplied when using
org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
as the value for yarn.resourcemanager.store.class</description>
<name>yarn.resourcemanager.zk.state-store.acl</name>
<value>world:anyone:rwcda</value>
</property>
<property>
<description>URI pointing to the location of the FileSystem path where
RM state will be stored. This must be supplied when using

View File

@ -41,6 +41,16 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-web-proxy</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>

View File

@ -63,12 +63,6 @@ public class FileSystemRMStateStore extends RMStateStore {
public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class);
private static final String ROOT_DIR_NAME = "FSRMStateRoot";
private static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot";
private static final String RM_APP_ROOT = "RMAppRoot";
private static final String DELEGATION_KEY_PREFIX = "DelegationKey_";
private static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
private static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
"RMDTSequenceNumber_";
protected FileSystem fs;

View File

@ -65,6 +65,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
*/
public abstract class RMStateStore extends AbstractService {
// constants for RM App state and RMDTSecretManagerState.
protected static final String RM_APP_ROOT = "RMAppRoot";
protected static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot";
protected static final String DELEGATION_KEY_PREFIX = "DelegationKey_";
protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
"RMDTSequenceNumber_";
public static final Log LOG = LogFactory.getLog(RMStateStore.class);
public RMStateStore() {
@ -464,8 +472,9 @@ public abstract class RMStateStore extends AbstractService {
(ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl
.newApplicationAttemptStateData(attemptState.getAttemptId(),
attemptState.getMasterContainer(), appAttemptTokens);
LOG.info("Storing info for attempt: " + attemptState.getAttemptId());
if (LOG.isDebugEnabled()) {
LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
}
storeApplicationAttemptState(attemptState.getAttemptId().toString(),
attemptStateData);
} catch (Exception e) {

View File

@ -0,0 +1,621 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ZKUtil;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@Private
@Unstable
public class ZKRMStateStore extends RMStateStore {
public static final Log LOG = LogFactory.getLog(ZKRMStateStore.class);
private static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
private int numRetries;
private String zkHostPort = null;
private int zkSessionTimeout;
private List<ACL> zkAcl;
private String zkRootNodePath;
private String rmDTSecretManagerRoot;
private String rmAppRoot;
private String dtSequenceNumberPath = null;
@VisibleForTesting
protected String znodeWorkingPath;
@VisibleForTesting
protected ZooKeeper zkClient;
private ZooKeeper oldZkClient;
@Override
public synchronized void initInternal(Configuration conf) throws Exception {
zkHostPort = conf.get(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS);
if (zkHostPort == null) {
throw new YarnRuntimeException("No server address specified for " +
"zookeeper state store for Resource Manager recovery. " +
YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS + " is not configured.");
}
numRetries =
conf.getInt(YarnConfiguration.ZK_RM_STATE_STORE_NUM_RETRIES,
YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_NUM_RETRIES);
znodeWorkingPath =
conf.get(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH,
YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH);
zkSessionTimeout =
conf.getInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS,
YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_TIMEOUT_MS);
// Parse authentication from configuration.
String zkAclConf =
conf.get(YarnConfiguration.ZK_RM_STATE_STORE_ACL,
YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_ACL);
zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
try {
zkAcl = ZKUtil.parseACLs(zkAclConf);
} catch (ZKUtil.BadAclFormatException bafe) {
LOG.error("Invalid format for " + YarnConfiguration.ZK_RM_STATE_STORE_ACL);
throw bafe;
}
zkRootNodePath = znodeWorkingPath + "/" + ROOT_ZNODE_NAME;
rmDTSecretManagerRoot = zkRootNodePath + "/" + RM_DT_SECRET_MANAGER_ROOT;
rmAppRoot = zkRootNodePath + "/" + RM_APP_ROOT;
}
@Override
public synchronized void startInternal() throws Exception {
// createConnection for future API calls
createConnection();
// ensure root dirs exist
createRootDir(znodeWorkingPath);
createRootDir(zkRootNodePath);
createRootDir(rmDTSecretManagerRoot);
createRootDir(rmAppRoot);
}
private void createRootDir(String rootPath) throws Exception {
try {
createWithRetries(rootPath, null, zkAcl, CreateMode.PERSISTENT);
} catch (KeeperException ke) {
if (ke.code() != Code.NODEEXISTS) {
throw ke;
}
}
}
private synchronized void closeZkClients() throws IOException {
if (zkClient != null) {
try {
zkClient.close();
} catch (InterruptedException e) {
throw new IOException("Interrupted while closing ZK", e);
}
zkClient = null;
}
if (oldZkClient != null) {
try {
oldZkClient.close();
} catch (InterruptedException e) {
throw new IOException("Interrupted while closing old ZK", e);
}
oldZkClient = null;
}
}
@Override
protected synchronized void closeInternal() throws Exception {
closeZkClients();
}
@Override
public synchronized RMState loadState() throws Exception {
RMState rmState = new RMState();
// recover DelegationTokenSecretManager
loadRMDTSecretManagerState(rmState);
// recover RM applications
loadRMAppState(rmState);
return rmState;
}
private synchronized void loadRMDTSecretManagerState(RMState rmState)
throws Exception {
List<String> childNodes = zkClient.getChildren(rmDTSecretManagerRoot, true);
for (String childNodeName : childNodes) {
if (childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
rmState.rmSecretManagerState.dtSequenceNumber =
Integer.parseInt(childNodeName.split("_")[1]);
continue;
}
String childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName);
byte[] childData = getDataWithRetries(childNodePath, true);
ByteArrayInputStream is = new ByteArrayInputStream(childData);
DataInputStream fsIn = new DataInputStream(is);
try {
if (childNodeName.startsWith(DELEGATION_KEY_PREFIX)) {
DelegationKey key = new DelegationKey();
key.readFields(fsIn);
rmState.rmSecretManagerState.masterKeyState.add(key);
} else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
RMDelegationTokenIdentifier identifier =
new RMDelegationTokenIdentifier();
identifier.readFields(fsIn);
long renewDate = fsIn.readLong();
rmState.rmSecretManagerState.delegationTokenState.put(identifier,
renewDate);
}
} finally {
is.close();
}
}
}
private synchronized void loadRMAppState(RMState rmState) throws Exception {
List<String> childNodes = zkClient.getChildren(rmAppRoot, true);
List<ApplicationAttemptState> attempts =
new ArrayList<ApplicationAttemptState>();
for (String childNodeName : childNodes) {
String childNodePath = getNodePath(rmAppRoot, childNodeName);
byte[] childData = getDataWithRetries(childNodePath, true);
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
// application
LOG.info("Loading application from znode: " + childNodeName);
ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
ApplicationStateDataPBImpl appStateData =
new ApplicationStateDataPBImpl(
ApplicationStateDataProto.parseFrom(childData));
ApplicationState appState =
new ApplicationState(appStateData.getSubmitTime(),
appStateData.getApplicationSubmissionContext(),
appStateData.getUser());
if (!appId.equals(appState.context.getApplicationId())) {
throw new YarnRuntimeException("The child node name is different " +
"from the application id");
}
rmState.appState.put(appId, appState);
} else if (childNodeName
.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
// attempt
LOG.info("Loading application attempt from znode: " + childNodeName);
ApplicationAttemptId attemptId =
ConverterUtils.toApplicationAttemptId(childNodeName);
ApplicationAttemptStateDataPBImpl attemptStateData =
new ApplicationAttemptStateDataPBImpl(
ApplicationAttemptStateDataProto.parseFrom(childData));
Credentials credentials = null;
if (attemptStateData.getAppAttemptTokens() != null) {
credentials = new Credentials();
DataInputByteBuffer dibb = new DataInputByteBuffer();
dibb.reset(attemptStateData.getAppAttemptTokens());
credentials.readTokenStorageStream(dibb);
}
ApplicationAttemptState attemptState =
new ApplicationAttemptState(attemptId,
attemptStateData.getMasterContainer(), credentials);
if (!attemptId.equals(attemptState.getAttemptId())) {
throw new YarnRuntimeException("The child node name is different " +
"from the application attempt id");
}
attempts.add(attemptState);
} else {
LOG.info("Unknown child node with name: " + childNodeName);
}
}
// go through all attempts and add them to their apps
for (ApplicationAttemptState attemptState : attempts) {
ApplicationId appId = attemptState.getAttemptId().getApplicationId();
ApplicationState appState = rmState.appState.get(appId);
if (appState != null) {
appState.attempts.put(attemptState.getAttemptId(), attemptState);
} else {
// the application znode may have been removed when the application
// completed but the RM might have stopped before it could remove the
// application attempt znodes
LOG.info("Application node not found for attempt: "
+ attemptState.getAttemptId());
deleteWithRetries(
getNodePath(rmAppRoot, attemptState.getAttemptId().toString()),
0);
}
}
}
@Override
public synchronized void storeApplicationState(
String appId, ApplicationStateDataPBImpl appStateDataPB) throws
Exception {
String nodeCreatePath = getNodePath(rmAppRoot, appId);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath);
}
byte[] appStateData = appStateDataPB.getProto().toByteArray();
createWithRetries(
nodeCreatePath, appStateData, zkAcl, CreateMode.PERSISTENT);
}
@Override
public synchronized void storeApplicationAttemptState(
String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB)
throws Exception {
String nodeCreatePath = getNodePath(rmAppRoot, attemptId);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing info for attempt: " + attemptId + " at: "
+ nodeCreatePath);
}
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
createWithRetries(nodeCreatePath, attemptStateData, zkAcl,
CreateMode.PERSISTENT);
}
@Override
public synchronized void removeApplicationState(ApplicationState appState)
throws Exception {
String appId = appState.getAppId().toString();
String nodeRemovePath = getNodePath(rmAppRoot, appId);
ArrayList<Op> opList = new ArrayList<Op>();
opList.add(Op.delete(nodeRemovePath, 0));
for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
String attemptRemovePath = getNodePath(rmAppRoot, attemptId.toString());
opList.add(Op.delete(attemptRemovePath, 0));
}
if (LOG.isDebugEnabled()) {
LOG.debug("Removing info for app: " + appId + " at: " + nodeRemovePath
+ " and its attempts.");
}
doMultiWithRetries(opList);
}
@Override
protected synchronized void storeRMDelegationTokenAndSequenceNumberState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
int latestSequenceNumber) throws Exception {
ArrayList<Op> opList = new ArrayList<Op>();
// store RM delegation token
String nodeCreatePath =
getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber());
ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream fsOut = new DataOutputStream(os);
try {
rmDTIdentifier.write(fsOut);
fsOut.writeLong(renewDate);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing RMDelegationToken_" +
rmDTIdentifier.getSequenceNumber());
}
opList.add(Op.create(nodeCreatePath, os.toByteArray(), zkAcl,
CreateMode.PERSISTENT));
} finally {
os.close();
}
// store sequence number
String latestSequenceNumberPath =
getNodePath(rmDTSecretManagerRoot,
DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + latestSequenceNumber);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX +
latestSequenceNumber);
}
if (dtSequenceNumberPath != null) {
opList.add(Op.delete(dtSequenceNumberPath, 0));
}
opList.add(Op.create(latestSequenceNumberPath, null, zkAcl,
CreateMode.PERSISTENT));
dtSequenceNumberPath = latestSequenceNumberPath;
doMultiWithRetries(opList);
}
@Override
protected synchronized void removeRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
String nodeRemovePath =
getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber());
if (LOG.isDebugEnabled()) {
LOG.debug("Removing RMDelegationToken_"
+ rmDTIdentifier.getSequenceNumber());
}
deleteWithRetries(nodeRemovePath, 0);
}
@Override
protected synchronized void storeRMDTMasterKeyState(
DelegationKey delegationKey) throws Exception {
String nodeCreatePath =
getNodePath(rmDTSecretManagerRoot, DELEGATION_KEY_PREFIX
+ delegationKey.getKeyId());
ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream fsOut = new DataOutputStream(os);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing RMDelegationKey_" + delegationKey.getKeyId());
}
delegationKey.write(fsOut);
try {
createWithRetries(nodeCreatePath, os.toByteArray(), zkAcl,
CreateMode.PERSISTENT);
} finally {
os.close();
}
}
@Override
protected synchronized void removeRMDTMasterKeyState(
DelegationKey delegationKey) throws Exception {
String nodeRemovePath =
getNodePath(rmDTSecretManagerRoot, DELEGATION_KEY_PREFIX
+ delegationKey.getKeyId());
if (LOG.isDebugEnabled()) {
LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
}
deleteWithRetries(nodeRemovePath, 0);
}
// ZK related code
/**
* Watcher implementation which forward events to the ZKRMStateStore This
* hides the ZK methods of the store from its public interface
*/
private final class ForwardingWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
try {
ZKRMStateStore.this.processWatchEvent(event);
} catch (Throwable t) {
LOG.error("Failed to process watcher event " + event + ": "
+ StringUtils.stringifyException(t));
}
}
}
@VisibleForTesting
public synchronized void processWatchEvent(WatchedEvent event)
throws Exception {
Event.EventType eventType = event.getType();
LOG.info("Watcher event type: " + eventType + " with state:"
+ event.getState() + " for path:" + event.getPath() + " for " + this);
if (eventType == Event.EventType.None) {
// the connection state has changed
switch (event.getState()) {
case SyncConnected:
LOG.info("ZKRMStateStore Session connected");
if (oldZkClient != null) {
// the SyncConnected must be from the client that sent Disconnected
zkClient = oldZkClient;
oldZkClient = null;
ZKRMStateStore.this.notifyAll();
LOG.info("ZKRMStateStore Session restored");
}
break;
case Disconnected:
LOG.info("ZKRMStateStore Session disconnected");
oldZkClient = zkClient;
zkClient = null;
break;
case Expired:
// the connection got terminated because of session timeout
// call listener to reconnect
LOG.info("Session expired");
createConnection();
break;
default:
LOG.error("Unexpected Zookeeper" +
" watch event state: " + event.getState());
break;
}
}
}
@VisibleForTesting
String getNodePath(String root, String nodeName) {
return (root + "/" + nodeName);
}
@VisibleForTesting
public String createWithRetries(
final String path, final byte[] data, final List<ACL> acl,
final CreateMode mode) throws Exception {
return new ZKAction<String>() {
@Override
public String run() throws KeeperException, InterruptedException {
return zkClient.create(path, data, acl, mode);
}
}.runWithRetries();
}
private void deleteWithRetries(final String path, final int version)
throws Exception {
new ZKAction<Void>() {
@Override
public Void run() throws KeeperException, InterruptedException {
/**
* Call exists() to leave a watch on the node denoted by path.
* Delete node if exists. To pass the existence information to the
* caller, call delete irrespective of whether node exists or not.
*/
if (zkClient.exists(path, true) == null) {
LOG.error("Trying to delete a path (" + path
+ ") that doesn't exist.");
}
zkClient.delete(path, version);
return null;
}
}.runWithRetries();
}
private void doMultiWithRetries(final ArrayList<Op> opList) throws Exception {
new ZKAction<Void>() {
@Override
public Void run() throws KeeperException, InterruptedException {
zkClient.multi(opList);
return null;
}
}.runWithRetries();
}
@VisibleForTesting
public void setDataWithRetries(final String path, final byte[] data,
final int version) throws Exception {
new ZKAction<Void>() {
@Override
public Void run() throws KeeperException, InterruptedException {
zkClient.setData(path, data, version);
return null;
}
}.runWithRetries();
}
@VisibleForTesting
public byte[] getDataWithRetries(final String path, final boolean watch)
throws Exception {
return new ZKAction<byte[]>() {
@Override
public byte[] run() throws KeeperException, InterruptedException {
Stat stat = new Stat();
return zkClient.getData(path, watch, stat);
}
}.runWithRetries();
}
private abstract class ZKAction<T> {
// run() expects synchronization on ZKRMStateStore.this
abstract T run() throws KeeperException, InterruptedException;
T runWithCheck() throws Exception {
long startTime = System.currentTimeMillis();
synchronized (ZKRMStateStore.this) {
while (zkClient == null) {
ZKRMStateStore.this.wait(zkSessionTimeout);
if (zkClient != null) {
break;
}
if (System.currentTimeMillis() - startTime > zkSessionTimeout) {
throw new IOException("Wait for ZKClient creation timed out");
}
}
return run();
}
}
T runWithRetries() throws Exception {
int retry = 0;
while (true) {
try {
return runWithCheck();
} catch (KeeperException ke) {
if (shouldRetry(ke.code()) && ++retry < numRetries) {
continue;
}
throw ke;
}
}
}
}
private static boolean shouldRetry(Code code) {
switch (code) {
case CONNECTIONLOSS:
case OPERATIONTIMEOUT:
return true;
default:
break;
}
return false;
}
private synchronized void createConnection()
throws IOException, InterruptedException {
closeZkClients();
for (int retries = 0; retries < numRetries && zkClient == null;
retries++) {
try {
zkClient = getNewZooKeeper();
} catch (IOException ioe) {
// Retry in case of network failures
LOG.info("Failed to connect to the ZooKeeper on attempt - " +
(retries + 1));
ioe.printStackTrace();
}
}
if (zkClient == null) {
LOG.error("Unable to connect to Zookeeper");
throw new YarnRuntimeException("Unable to connect to Zookeeper");
}
ZKRMStateStore.this.notifyAll();
LOG.info("Created new ZK connection");
}
// protected to mock for testing
@VisibleForTesting
protected synchronized ZooKeeper getNewZooKeeper()
throws IOException, InterruptedException {
ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null);
zk.register(new ForwardingWatcher());
return zk;
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies;
import java.io.Serializable;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@ -73,6 +74,10 @@ public class FifoPolicy extends SchedulingPolicy {
@Override
public void computeShares(Collection<? extends Schedulable> schedulables,
Resource totalResources) {
if (schedulables.isEmpty()) {
return;
}
Schedulable earliest = null;
for (Schedulable schedulable : schedulables) {
if (earliest == null ||

View File

@ -26,8 +26,10 @@ import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import javax.crypto.SecretKey;
@ -40,6 +42,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.Text;
@ -67,13 +70,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Test;
public class TestRMStateStore {
public class TestRMStateStore extends ClientBaseWithFixes{
public static final Log LOG = LogFactory.getLog(TestRMStateStore.class);
class TestDispatcher implements Dispatcher, EventHandler<RMAppAttemptStoredEvent> {
static class TestDispatcher implements
Dispatcher, EventHandler<RMAppAttemptStoredEvent> {
ApplicationAttemptId attemptId;
Exception storedException;
@ -82,7 +89,8 @@ public class TestRMStateStore {
@SuppressWarnings("rawtypes")
@Override
public void register(Class<? extends Enum> eventType, EventHandler handler) {
public void register(Class<? extends Enum> eventType,
EventHandler handler) {
}
@Override
@ -108,10 +116,18 @@ public class TestRMStateStore {
boolean isFinalStateValid() throws Exception;
}
@Test
public void testZKRMStateStoreRealZK() throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
testRMAppStateStore(zkTester);
testRMDTSecretManagerStateStore(zkTester);
}
@Test
public void testFSRMStateStore() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
try {
TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster);
testRMAppStateStore(fsTester);
@ -121,6 +137,41 @@ public class TestRMStateStore {
}
}
class TestZKRMStateStoreTester implements RMStateStoreHelper {
ZooKeeper client;
ZKRMStateStore store;
class TestZKRMStateStore extends ZKRMStateStore {
public TestZKRMStateStore(Configuration conf, String workingZnode)
throws Exception {
init(conf);
start();
assertTrue(znodeWorkingPath.equals(workingZnode));
}
@Override
public ZooKeeper getNewZooKeeper() throws IOException {
return client;
}
}
public RMStateStore getRMStateStore() throws Exception {
String workingZnode = "/Test";
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
this.client = createClient();
this.store = new TestZKRMStateStore(conf, workingZnode);
return this.store;
}
@Override
public boolean isFinalStateValid() throws Exception {
List<String> nodes = client.getChildren(store.znodeWorkingPath, false);
return nodes.size() == 1;
}
}
class TestFSRMStateStoreTester implements RMStateStoreHelper {
Path workingDirPathURI;
FileSystemRMStateStore store;
@ -149,7 +200,8 @@ public class TestRMStateStore {
@Override
public RMStateStore getRMStateStore() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI, workingDirPathURI.toString());
conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI,
workingDirPathURI.toString());
this.store = new TestFileSystemRMStore(conf);
return store;
}
@ -158,11 +210,7 @@ public class TestRMStateStore {
public boolean isFinalStateValid() throws Exception {
FileSystem fs = cluster.getFileSystem();
FileStatus[] files = fs.listStatus(workingDirPathURI);
if(files.length == 1) {
// only store root directory should exist
return true;
}
return false;
return files.length == 1;
}
}
@ -183,9 +231,10 @@ public class TestRMStateStore {
dispatcher.notified = false;
}
void storeApp(RMStateStore store, ApplicationId appId, long time)
throws Exception {
ApplicationSubmissionContext context = new ApplicationSubmissionContextPBImpl();
void storeApp(
RMStateStore store, ApplicationId appId, long time) throws Exception {
ApplicationSubmissionContext context =
new ApplicationSubmissionContextPBImpl();
context.setApplicationId(appId);
RMApp mockApp = mock(RMApp.class);
@ -216,7 +265,8 @@ public class TestRMStateStore {
return container.getId();
}
void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) throws Exception {
void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
throws Exception {
long submitTime = System.currentTimeMillis();
Configuration conf = new YarnConfiguration();
RMStateStore store = stateStoreHelper.getRMStateStore();
@ -271,7 +321,8 @@ public class TestRMStateStore {
RMApp mockRemovedApp = mock(RMApp.class);
HashMap<ApplicationAttemptId, RMAppAttempt> attempts =
new HashMap<ApplicationAttemptId, RMAppAttempt>();
ApplicationSubmissionContext context = new ApplicationSubmissionContextPBImpl();
ApplicationSubmissionContext context =
new ApplicationSubmissionContextPBImpl();
context.setApplicationId(appIdRemoved);
when(mockRemovedApp.getSubmitTime()).thenReturn(submitTime);
when(mockRemovedApp.getApplicationSubmissionContext()).thenReturn(context);
@ -288,7 +339,8 @@ public class TestRMStateStore {
// load state
store = stateStoreHelper.getRMStateStore();
RMState state = store.loadState();
Map<ApplicationId, ApplicationState> rmAppState = state.getApplicationState();
Map<ApplicationId, ApplicationState> rmAppState =
state.getApplicationState();
ApplicationState appState = rmAppState.get(appId1);
// app is loaded
@ -362,7 +414,8 @@ public class TestRMStateStore {
store.loadState().getRMDTSecretManagerState();
Assert.assertEquals(token1, secretManagerState.getTokenState());
Assert.assertEquals(keySet, secretManagerState.getMasterKeyState());
Assert.assertEquals(sequenceNumber, secretManagerState.getDTSequenceNumber());
Assert.assertEquals(sequenceNumber,
secretManagerState.getDTSequenceNumber());
}
private Token<AMRMTokenIdentifier> generateAMRMToken(

View File

@ -0,0 +1,218 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.TestRMStateStore.TestDispatcher;
import org.apache.hadoop.util.ZKUtil;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CyclicBarrier;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestZKRMStateStoreZKClientConnections extends
ClientBaseWithFixes {
private static final int ZK_OP_WAIT_TIME = 3000;
private Log LOG =
LogFactory.getLog(TestZKRMStateStoreZKClientConnections.class);
class TestZKClient {
ZKRMStateStore store;
boolean forExpire = false;
TestForwardingWatcher watcher;
CyclicBarrier syncBarrier = new CyclicBarrier(2);
protected class TestZKRMStateStore extends ZKRMStateStore {
public TestZKRMStateStore(Configuration conf, String workingZnode)
throws Exception {
init(conf);
start();
assertTrue(znodeWorkingPath.equals(workingZnode));
}
@Override
public ZooKeeper getNewZooKeeper()
throws IOException, InterruptedException {
return createClient(watcher, hostPort, 100);
}
@Override
public synchronized void processWatchEvent(WatchedEvent event)
throws Exception {
if (forExpire) {
// a hack... couldn't find a way to trigger expired event.
WatchedEvent expriredEvent = new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired, null);
super.processWatchEvent(expriredEvent);
forExpire = false;
syncBarrier.await();
} else {
super.processWatchEvent(event);
}
}
}
private class TestForwardingWatcher extends
ClientBaseWithFixes.CountdownWatcher {
public void process(WatchedEvent event) {
super.process(event);
try {
if (store != null) {
store.processWatchEvent(event);
}
} catch (Throwable t) {
LOG.error("Failed to process watcher event " + event + ": "
+ StringUtils.stringifyException(t));
}
}
}
public RMStateStore getRMStateStore(Configuration conf) throws Exception {
String workingZnode = "/Test";
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
watcher = new TestForwardingWatcher();
this.store = new TestZKRMStateStore(conf, workingZnode);
return this.store;
}
}
@Test(timeout = 20000)
public void testZKClientDisconnectAndReconnect()
throws Exception {
TestZKClient zkClientTester = new TestZKClient();
String path = "/test";
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 100);
ZKRMStateStore store =
(ZKRMStateStore) zkClientTester.getRMStateStore(conf);
TestDispatcher dispatcher = new TestDispatcher();
store.setRMDispatcher(dispatcher);
// trigger watch
store.createWithRetries(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
store.getDataWithRetries(path, true);
store.setDataWithRetries(path, "newBytes".getBytes(), 0);
stopServer();
zkClientTester.watcher.waitForDisconnected(ZK_OP_WAIT_TIME);
try {
store.getDataWithRetries(path, true);
fail("Expected ZKClient time out exception");
} catch (Exception e) {
assertTrue(e.getMessage().contains(
"Wait for ZKClient creation timed out"));
}
// ZKRMStateStore Session restored
startServer();
zkClientTester.watcher.waitForConnected(ZK_OP_WAIT_TIME);
byte[] ret = null;
try {
ret = store.getDataWithRetries(path, true);
} catch (Exception e) {
String error = "ZKRMStateStore Session restore failed";
LOG.error(error, e);
fail(error);
}
Assert.assertEquals("newBytes", new String(ret));
}
@Test(timeout = 20000)
public void testZKSessionTimeout() throws Exception {
TestZKClient zkClientTester = new TestZKClient();
String path = "/test";
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 100);
ZKRMStateStore store =
(ZKRMStateStore) zkClientTester.getRMStateStore(conf);
TestDispatcher dispatcher = new TestDispatcher();
store.setRMDispatcher(dispatcher);
// a hack to trigger expired event
zkClientTester.forExpire = true;
// trigger watch
store.createWithRetries(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
store.getDataWithRetries(path, true);
store.setDataWithRetries(path, "bytes".getBytes(), 0);
zkClientTester.syncBarrier.await();
// after this point, expired event has already been processed.
try {
byte[] ret = store.getDataWithRetries(path, false);
Assert.assertEquals("bytes", new String(ret));
} catch (Exception e) {
String error = "New session creation failed";
LOG.error(error, e);
fail(error);
}
}
@Test (timeout = 20000)
public void testSetZKAcl() {
TestZKClient zkClientTester = new TestZKClient();
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ACL, "world:anyone:rwca");
try {
zkClientTester.store.zkClient.delete(zkClientTester.store
.znodeWorkingPath, -1);
fail("Shouldn't be able to delete path");
} catch (Exception e) {/* expected behavior */}
}
@Test (timeout = 20000)
public void testInvalidZKAclConfiguration() {
TestZKClient zkClientTester = new TestZKClient();
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ACL, "randomstring&*");
try {
zkClientTester.getRMStateStore(conf);
fail("ZKRMStateStore created with bad ACL");
} catch (ZKUtil.BadAclFormatException bafe) {
// expected behavior
} catch (Exception e) {
String error = "Incorrect exception on BadAclFormat";
LOG.error(error, e);
fail(error);
}
}
}

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collection;
public class TestEmptyQueues {
private Collection<? extends Schedulable> schedulables;
@Before
public void setup() {
schedulables = new ArrayList<Schedulable>();
}
private void testComputeShares(SchedulingPolicy policy) {
policy.computeShares(schedulables, Resources.none());
}
@Test (timeout = 1000)
public void testFifoPolicy() {
testComputeShares(SchedulingPolicy.getInstance(FifoPolicy.class));
}
@Test (timeout = 1000)
public void testFairSharePolicy() {
testComputeShares(SchedulingPolicy.getInstance(FairSharePolicy.class));
}
@Test (timeout = 1000)
public void testDRFPolicy() {
testComputeShares(
SchedulingPolicy.getInstance(DominantResourceFairnessPolicy.class));
}
}

View File

@ -135,16 +135,46 @@ public class ProxyUriUtils {
}
}
/**
* Create a URI form a no scheme Url, such as is returned by the AM.
* @param url the URL format returned by an AM. This may or may not contain
* scheme.
* @return a URI with an http scheme
* @throws URISyntaxException if the url is not formatted correctly.
*/
public static URI getUriFromAMUrl(String url)
throws URISyntaxException {
if (getSchemeFromUrl(url).isEmpty()) {
/*
* check is made to make sure if AM reports with scheme then it will be
* used by default otherwise it will default to the one configured using
* "hadoop.ssl.enabled".
*/
return new URI(HttpConfig.getSchemePrefix() + url);
} else {
return new URI(url);
}
}
/**
* Create a URI form a no scheme Url, such as is returned by the AM.
* @param noSchemeUrl the URL formate returned by an AM
* @return a URI with an http scheme
* @throws URISyntaxException if the url is not formatted correctly.
*/
public static URI getUriFromAMUrl(String noSchemeUrl)
throws URISyntaxException {
return new URI(HttpConfig.getSchemePrefix() + noSchemeUrl);
}
public static URI getUriFromAMUrl(String scheme, String noSchemeUrl)
throws URISyntaxException {
if (getSchemeFromUrl(noSchemeUrl).isEmpty()) {
/*
* check is made to make sure if AM reports with scheme then it will be
* used by default otherwise it will default to the one configured using
* "hadoop.ssl.enabled".
*/
return new URI(scheme + "://" + noSchemeUrl);
} else {
return new URI(noSchemeUrl);
}
}
/**
* Returns the first valid tracking link, if any, from the given id from the
@ -169,4 +199,20 @@ public class ProxyUriUtils {
}
return null;
}
/**
* Returns the scheme if present in the url
* eg. "https://issues.apache.org/jira/browse/YARN" > "https"
*/
public static String getSchemeFromUrl(String url) {
int index = 0;
if (url != null) {
index = url.indexOf("://");
}
if (index > 0) {
return url.substring(0, index);
} else {
return "";
}
}
}

View File

@ -163,7 +163,6 @@ public class WebAppProxyServlet extends HttpServlet {
}
config.setLocalAddress(localAddress);
HttpMethod method = new GetMethod(uri.getEscapedURI());
@SuppressWarnings("unchecked")
Enumeration<String> names = req.getHeaderNames();
while(names.hasMoreElements()) {
@ -293,14 +292,17 @@ public class WebAppProxyServlet extends HttpServlet {
}
String original = applicationReport.getOriginalTrackingUrl();
URI trackingUri = null;
if (original != null) {
trackingUri = ProxyUriUtils.getUriFromAMUrl(original);
}
// fallback to ResourceManager's app page if no tracking URI provided
if(original == null || original.equals("N/A")) {
resp.sendRedirect(resp.encodeRedirectURL(
StringHelper.pjoin(rmAppPageUrlBase, id.toString())));
return;
} else {
if (ProxyUriUtils.getSchemeFromUrl(original).isEmpty()) {
trackingUri = ProxyUriUtils.getUriFromAMUrl("http", original);
} else {
trackingUri = new URI(original);
}
}
String runningUser = applicationReport.getUser();
@ -311,8 +313,7 @@ public class WebAppProxyServlet extends HttpServlet {
req.getQueryString(), true), runningUser, id);
return;
}
URI toFetch = new URI(req.getScheme(),
URI toFetch = new URI(trackingUri.getScheme(),
trackingUri.getAuthority(),
StringHelper.ujoin(trackingUri.getPath(), rest), req.getQueryString(),
null);