Merging r1519884 through r1520449 from trunk to branch HDFS-2832

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1520454 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2013-09-05 23:01:00 +00:00
commit bbce64c8c5
30 changed files with 583 additions and 90 deletions

View File

@ -330,6 +330,9 @@ Release 2.3.0 - UNRELEASED
HADOOP-9889. Refresh the Krb5 configuration when creating a new kdc in
Hadoop-MiniKDC (Wei Yan via Sandy Ryza)
HADOOP-9915. o.a.h.fs.Stat support on Mac OS X (Binglin Chang via Colin
Patrick McCabe)
OPTIMIZATIONS
HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn)
@ -360,6 +363,8 @@ Release 2.3.0 - UNRELEASED
HADOOP-9909. org.apache.hadoop.fs.Stat should permit other LANG.
(Shinichi Yamashita via Andrew Wang)
HADOOP-9908. Fix NPE when versioninfo properties file is missing (todd)
Release 2.1.1-beta - UNRELEASED
INCOMPATIBLE CHANGES
@ -406,6 +411,8 @@ Release 2.1.1-beta - UNRELEASED
BUG FIXES
HADOOP-9916. Fix race in ipc.Client retry. (Binglin Chang via llu)
HADOOP-9768. chown and chgrp reject users and groups with spaces on platforms
where spaces are otherwise acceptable. (cnauroth)
@ -444,6 +451,11 @@ Release 2.1.1-beta - UNRELEASED
HADOOP-9774. RawLocalFileSystem.listStatus() return absolute paths when
input path is relative on Windows. (Shanyu Zhao via ivanmi)
HADOOP-9924. FileUtil.createJarWithClassPath() does not generate relative
classpath correctly. (Shanyu Zhao via ivanmi)
HADOOP-9932. Improper synchronization in RetryCache. (kihwal)
Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES

View File

@ -1252,7 +1252,14 @@ public class FileUtil {
}
} else {
// Append just this entry
String classPathEntryUrl = new File(classPathEntry).toURI().toURL()
File fileCpEntry = null;
if(!new Path(classPathEntry).isAbsolute()) {
fileCpEntry = new File(workingDir, classPathEntry);
}
else {
fileCpEntry = new File(classPathEntry);
}
String classPathEntryUrl = fileCpEntry.toURI().toURL()
.toExternalForm();
// File.toURI only appends trailing '/' if it can determine that it is a

View File

@ -80,7 +80,7 @@ public class Stat extends Shell {
* @return
*/
public static boolean isAvailable() {
if (Shell.LINUX || Shell.FREEBSD) {
if (Shell.LINUX || Shell.FREEBSD || Shell.MAC) {
return true;
}
return false;
@ -100,7 +100,7 @@ public class Stat extends Shell {
if (Shell.LINUX) {
return new String[] {
"stat", derefFlag + "c", "%s,%F,%Y,%X,%a,%U,%G,%N", path.toString() };
} else if (Shell.FREEBSD) {
} else if (Shell.FREEBSD || Shell.MAC) {
return new String[] {
"stat", derefFlag + "f", "%z,%HT,%m,%a,%Op,%Su,%Sg,`link' -> `%Y'",
path.toString() };

View File

@ -1063,8 +1063,8 @@ public class Client {
if (status == RpcStatusProto.SUCCESS) {
Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in); // read value
call.setRpcResponse(value);
calls.remove(callId);
call.setRpcResponse(value);
// verify that length was correct
// only for ProtobufEngine where len can be verified easily
@ -1098,8 +1098,8 @@ public class Client {
new RemoteException(exceptionClassName, errorMsg) :
new RemoteException(exceptionClassName, errorMsg, erCode));
if (status == RpcStatusProto.ERROR) {
call.setException(re);
calls.remove(callId);
call.setException(re);
} else if (status == RpcStatusProto.FATAL) {
// Close the connection
markClosed(re);
@ -1166,8 +1166,8 @@ public class Client {
Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator() ;
while (itor.hasNext()) {
Call c = itor.next().getValue();
itor.remove();
c.setException(closeException); // local exception
itor.remove();
}
}
}

View File

@ -76,6 +76,12 @@ public class RetryCache {
this.expirationTime = expirationTime;
}
CacheEntry(byte[] clientId, int callId, long expirationTime,
boolean success) {
this(clientId, callId, expirationTime);
this.state = success ? SUCCESS : FAILED;
}
private static int hashCode(long value) {
return (int)(value ^ (value >>> 32));
}
@ -147,6 +153,12 @@ public class RetryCache {
this.payload = payload;
}
CacheEntryWithPayload(byte[] clientId, int callId, Object payload,
long expirationTime, boolean success) {
super(clientId, callId, expirationTime, success);
this.payload = payload;
}
/** Override equals to avoid findbugs warnings */
@Override
public boolean equals(Object obj) {
@ -253,18 +265,20 @@ public class RetryCache {
*/
public void addCacheEntry(byte[] clientId, int callId) {
CacheEntry newEntry = new CacheEntry(clientId, callId, System.nanoTime()
+ expirationTime);
newEntry.completed(true);
set.put(newEntry);
+ expirationTime, true);
synchronized(this) {
set.put(newEntry);
}
}
public void addCacheEntryWithPayload(byte[] clientId, int callId,
Object payload) {
CacheEntry newEntry = new CacheEntryWithPayload(clientId, callId, payload,
System.nanoTime() + expirationTime);
// since the entry is loaded from editlog, we can assume it succeeded.
newEntry.completed(true);
set.put(newEntry);
CacheEntry newEntry = new CacheEntryWithPayload(clientId, callId, payload,
System.nanoTime() + expirationTime, true);
synchronized(this) {
set.put(newEntry);
}
}
private static CacheEntry newEntry(long expirationTime) {

View File

@ -48,6 +48,9 @@ public class VersionInfo {
try {
InputStream is = Thread.currentThread().getContextClassLoader()
.getResourceAsStream(versionInfoFile);
if (is == null) {
throw new IOException("Resource not found");
}
info.load(is);
} catch (IOException ex) {
LogFactory.getLog(getClass()).warn("Could not read '" +

View File

@ -782,14 +782,23 @@ public class TestFileUtil {
expectedClassPaths.add(wildcardMatch.toURI().toURL()
.toExternalForm());
}
} else if (nonExistentSubdir.equals(classPath)) {
// expect to maintain trailing path separator if present in input, even
// if directory doesn't exist yet
expectedClassPaths.add(new File(classPath).toURI().toURL()
.toExternalForm() + Path.SEPARATOR);
} else {
expectedClassPaths.add(new File(classPath).toURI().toURL()
.toExternalForm());
File fileCp = null;
if(!new Path(classPath).isAbsolute()) {
fileCp = new File(tmp, classPath);
}
else {
fileCp = new File(classPath);
}
if (nonExistentSubdir.equals(classPath)) {
// expect to maintain trailing path separator if present in input, even
// if directory doesn't exist yet
expectedClassPaths.add(fileCp.toURI().toURL()
.toExternalForm() + Path.SEPARATOR);
} else {
expectedClassPaths.add(fileCp.toURI().toURL()
.toExternalForm());
}
}
}
List<String> actualClassPaths = Arrays.asList(classPathAttr.split(" "));

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.fs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import java.io.BufferedReader;
@ -26,10 +27,11 @@ import java.io.FileNotFoundException;
import java.io.StringReader;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestStat {
public class TestStat extends FileSystemTestHelper {
private static Stat stat;
@ -113,6 +115,7 @@ public class TestStat {
@Test(timeout=10000)
public void testStatFileNotFound() throws Exception {
Assume.assumeTrue(Stat.isAvailable());
try {
stat.getFileStatus();
fail("Expected FileNotFoundException");
@ -125,4 +128,21 @@ public class TestStat {
public void testStatEnvironment() throws Exception {
assertEquals(stat.getEnvironment("LANG"), "C");
}
@Test(timeout=10000)
public void testStat() throws Exception {
Assume.assumeTrue(Stat.isAvailable());
FileSystem fs = FileSystem.getLocal(new Configuration());
Path testDir = new Path(getTestRootPath(fs), "teststat");
fs.mkdirs(testDir);
Path sub1 = new Path(testDir, "sub1");
Path sub2 = new Path(testDir, "sub2");
fs.mkdirs(sub1);
fs.createSymlink(sub1, sub2, false);
FileStatus stat1 = new Stat(sub1, 4096l, false, fs).getFileStatus();
FileStatus stat2 = new Stat(sub2, 0, false, fs).getFileStatus();
assertTrue(stat1.isDirectory());
assertFalse(stat2.isDirectory());
fs.delete(testDir, true);
}
}

View File

@ -216,13 +216,13 @@ public class TestIPC {
}
}
@Test
@Test(timeout=60000)
public void testSerial() throws IOException, InterruptedException {
testSerial(3, false, 2, 5, 100);
testSerial(3, true, 2, 5, 10);
internalTestSerial(3, false, 2, 5, 100);
internalTestSerial(3, true, 2, 5, 10);
}
public void testSerial(int handlerCount, boolean handlerSleep,
public void internalTestSerial(int handlerCount, boolean handlerSleep,
int clientCount, int callerCount, int callCount)
throws IOException, InterruptedException {
Server server = new TestServer(handlerCount, handlerSleep);
@ -249,7 +249,7 @@ public class TestIPC {
server.stop();
}
@Test
@Test(timeout=60000)
public void testStandAloneClient() throws IOException {
Client client = new Client(LongWritable.class, conf);
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10);
@ -383,7 +383,7 @@ public class TestIPC {
}
}
@Test
@Test(timeout=60000)
public void testIOEOnClientWriteParam() throws Exception {
doErrorTest(IOEOnWriteWritable.class,
LongWritable.class,
@ -391,7 +391,7 @@ public class TestIPC {
LongWritable.class);
}
@Test
@Test(timeout=60000)
public void testRTEOnClientWriteParam() throws Exception {
doErrorTest(RTEOnWriteWritable.class,
LongWritable.class,
@ -399,7 +399,7 @@ public class TestIPC {
LongWritable.class);
}
@Test
@Test(timeout=60000)
public void testIOEOnServerReadParam() throws Exception {
doErrorTest(LongWritable.class,
IOEOnReadWritable.class,
@ -407,7 +407,7 @@ public class TestIPC {
LongWritable.class);
}
@Test
@Test(timeout=60000)
public void testRTEOnServerReadParam() throws Exception {
doErrorTest(LongWritable.class,
RTEOnReadWritable.class,
@ -416,7 +416,7 @@ public class TestIPC {
}
@Test
@Test(timeout=60000)
public void testIOEOnServerWriteResponse() throws Exception {
doErrorTest(LongWritable.class,
LongWritable.class,
@ -424,7 +424,7 @@ public class TestIPC {
LongWritable.class);
}
@Test
@Test(timeout=60000)
public void testRTEOnServerWriteResponse() throws Exception {
doErrorTest(LongWritable.class,
LongWritable.class,
@ -432,7 +432,7 @@ public class TestIPC {
LongWritable.class);
}
@Test
@Test(timeout=60000)
public void testIOEOnClientReadResponse() throws Exception {
doErrorTest(LongWritable.class,
LongWritable.class,
@ -440,7 +440,7 @@ public class TestIPC {
IOEOnReadWritable.class);
}
@Test
@Test(timeout=60000)
public void testRTEOnClientReadResponse() throws Exception {
doErrorTest(LongWritable.class,
LongWritable.class,
@ -453,7 +453,7 @@ public class TestIPC {
* that a ping should have been sent. This is a reproducer for a
* deadlock seen in one iteration of HADOOP-6762.
*/
@Test
@Test(timeout=60000)
public void testIOEOnWriteAfterPingClient() throws Exception {
// start server
Client.setPingInterval(conf, 100);
@ -481,7 +481,7 @@ public class TestIPC {
* Test that, if the socket factory throws an IOE, it properly propagates
* to the client.
*/
@Test
@Test(timeout=60000)
public void testSocketFactoryException() throws IOException {
SocketFactory mockFactory = mock(SocketFactory.class);
doThrow(new IOException("Injected fault")).when(mockFactory).createSocket();
@ -503,7 +503,7 @@ public class TestIPC {
* failure is handled properly. This is a regression test for
* HADOOP-7428.
*/
@Test
@Test(timeout=60000)
public void testRTEDuringConnectionSetup() throws IOException {
// Set up a socket factory which returns sockets which
// throw an RTE when setSoTimeout is called.
@ -544,7 +544,7 @@ public class TestIPC {
}
}
@Test
@Test(timeout=60000)
public void testIpcTimeout() throws IOException {
// start server
Server server = new TestServer(1, true);
@ -566,7 +566,7 @@ public class TestIPC {
addr, null, null, 3*PING_INTERVAL+MIN_SLEEP_TIME, conf);
}
@Test
@Test(timeout=60000)
public void testIpcConnectTimeout() throws IOException {
// start server
Server server = new TestServer(1, true);
@ -670,31 +670,31 @@ public class TestIPC {
return FD_DIR.list().length;
}
@Test
@Test(timeout=60000)
public void testIpcFromHadoop_0_18_13() throws IOException {
doIpcVersionTest(NetworkTraces.HADOOP_0_18_3_RPC_DUMP,
NetworkTraces.RESPONSE_TO_HADOOP_0_18_3_RPC);
}
@Test
@Test(timeout=60000)
public void testIpcFromHadoop0_20_3() throws IOException {
doIpcVersionTest(NetworkTraces.HADOOP_0_20_3_RPC_DUMP,
NetworkTraces.RESPONSE_TO_HADOOP_0_20_3_RPC);
}
@Test
@Test(timeout=60000)
public void testIpcFromHadoop0_21_0() throws IOException {
doIpcVersionTest(NetworkTraces.HADOOP_0_21_0_RPC_DUMP,
NetworkTraces.RESPONSE_TO_HADOOP_0_21_0_RPC);
}
@Test
@Test(timeout=60000)
public void testHttpGetResponse() throws IOException {
doIpcVersionTest("GET / HTTP/1.0\r\n\r\n".getBytes(),
Server.RECEIVED_HTTP_REQ_RESPONSE.getBytes());
}
@Test
@Test(timeout=60000)
public void testConnectionRetriesOnSocketTimeoutExceptions() throws IOException {
Configuration conf = new Configuration();
// set max retries to 0
@ -720,7 +720,7 @@ public class TestIPC {
* (1) the rpc server uses the call id/retry provided by the rpc client, and
* (2) the rpc client receives the same call id/retry from the rpc server.
*/
@Test
@Test(timeout=60000)
public void testCallIdAndRetry() throws IOException {
final CallInfo info = new CallInfo();
@ -772,7 +772,7 @@ public class TestIPC {
/**
* Test the retry count while used in a retry proxy.
*/
@Test
@Test(timeout=60000)
public void testRetryProxy() throws IOException {
final Client client = new Client(LongWritable.class, conf);
@ -785,7 +785,9 @@ public class TestIPC {
}
};
final int totalRetry = 256;
// try more times, so it is easier to find race condition bug
// 10000 times runs about 6s on a core i7 machine
final int totalRetry = 10000;
DummyProtocol proxy = (DummyProtocol) Proxy.newProxyInstance(
DummyProtocol.class.getClassLoader(),
new Class[] { DummyProtocol.class }, new TestInvocationHandler(client,
@ -807,7 +809,7 @@ public class TestIPC {
/**
* Test if the rpc server gets the default retry count (0) from client.
*/
@Test
@Test(timeout=60000)
public void testInitialCallRetryCount() throws IOException {
// Override client to store the call id
final Client client = new Client(LongWritable.class, conf);
@ -838,7 +840,7 @@ public class TestIPC {
/**
* Test if the rpc server gets the retry count from client.
*/
@Test
@Test(timeout=60000)
public void testCallRetryCount() throws IOException {
final int retryCount = 255;
// Override client to store the call id
@ -873,7 +875,7 @@ public class TestIPC {
* even if multiple threads are using the same client.
* @throws InterruptedException
*/
@Test
@Test(timeout=60000)
public void testUniqueSequentialCallIds()
throws IOException, InterruptedException {
int serverThreads = 10, callerCount = 100, perCallerCallCount = 100;

View File

@ -415,6 +415,9 @@ Release 2.1.1-beta - UNRELEASED
HDFS-5140. Too many safemode monitor threads being created in the standby
namenode causing it to fail with out of memory error. (jing9)
HDFS-5159. Secondary NameNode fails to checkpoint if error occurs
downloading edits on first checkpoint. (atm)
Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES

View File

@ -429,10 +429,8 @@ public class SecondaryNameNode implements Runnable {
dstImage.getStorage().cTime = sig.cTime;
// get fsimage
boolean downloadImage = true;
if (sig.mostRecentCheckpointTxId ==
dstImage.getStorage().getMostRecentCheckpointTxId()) {
downloadImage = false;
LOG.info("Image has not changed. Will not download image.");
} else {
LOG.info("Image has changed. Downloading updated image from NN.");
@ -448,7 +446,9 @@ public class SecondaryNameNode implements Runnable {
nnHostPort, log, dstImage.getStorage());
}
return Boolean.valueOf(downloadImage);
// true if we haven't loaded all the transactions represented by the
// downloaded fsimage.
return dstImage.getLastAppliedTxId() < sig.mostRecentCheckpointTxId;
}
});
return b.booleanValue();

View File

@ -39,7 +39,6 @@ import java.util.Collection;
import java.util.List;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.filefilter.FileFilterUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
@ -1224,7 +1223,6 @@ public class TestCheckpoint {
}
/* Test case to test CheckpointSignature */
@SuppressWarnings("deprecation")
@Test
public void testCheckpointSignature() throws IOException {
@ -1562,12 +1560,65 @@ public class TestCheckpoint {
Mockito.reset(faultInjector);
}
}
/**
* Test that a fault while downloading edits the first time after the 2NN
* starts up does not prevent future checkpointing.
*/
@Test(timeout = 30000)
public void testEditFailureOnFirstCheckpoint() throws IOException {
Configuration conf = new HdfsConfiguration();
SecondaryNameNode secondary = null;
MiniDFSCluster cluster = null;
FileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes)
.build();
cluster.waitActive();
fs = cluster.getFileSystem();
fs.mkdirs(new Path("test-file-1"));
// Make sure the on-disk fsimage on the NN has txid > 0.
FSNamesystem fsns = cluster.getNamesystem();
fsns.enterSafeMode(false);
fsns.saveNamespace();
fsns.leaveSafeMode();
secondary = startSecondaryNameNode(conf);
// Cause edit rename to fail during next checkpoint
Mockito.doThrow(new IOException("Injecting failure before edit rename"))
.when(faultInjector).beforeEditsRename();
try {
secondary.doCheckpoint();
fail("Fault injection failed.");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains(
"Injecting failure before edit rename", ioe);
}
Mockito.reset(faultInjector);
// Next checkpoint should succeed
secondary.doCheckpoint();
} finally {
if (secondary != null) {
secondary.shutdown();
}
if (fs != null) {
fs.close();
}
if (cluster != null) {
cluster.shutdown();
}
Mockito.reset(faultInjector);
}
}
/**
* Test that the secondary namenode correctly deletes temporary edits
* on startup.
*/
@Test(timeout = 30000)
public void testDeleteTemporaryEditsOnStartup() throws IOException {
Configuration conf = new HdfsConfiguration();
@ -1943,7 +1994,6 @@ public class TestCheckpoint {
* Test that, if a storage directory is failed when a checkpoint occurs,
* the non-failed storage directory receives the checkpoint.
*/
@SuppressWarnings("deprecation")
@Test
public void testCheckpointWithFailedStorageDir() throws Exception {
MiniDFSCluster cluster = null;
@ -2006,7 +2056,6 @@ public class TestCheckpoint {
* should function correctly.
* @throws Exception
*/
@SuppressWarnings("deprecation")
@Test
public void testCheckpointWithSeparateDirsAfterNameFails() throws Exception {
MiniDFSCluster cluster = null;

View File

@ -251,6 +251,8 @@ Release 2.1.1-beta - UNRELEASED
commands to reboot, so that client can continue to track the overall job.
(Jian He via vinodkv)
MAPREDUCE-5475. MRClientService does not verify ACLs properly (jlowe)
Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES
@ -1329,6 +1331,8 @@ Release 0.23.10 - UNRELEASED
MAPREDUCE-5001. LocalJobRunner has race condition resulting in job
failures (Sandy Ryza via jlowe)
MAPREDUCE-5475. MRClientService does not verify ACLs properly (jlowe)
Release 0.23.9 - 2013-07-08
INCOMPATIBLE CHANGES

View File

@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
@ -78,6 +79,8 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
import org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.factories.RecordFactory;
@ -175,16 +178,22 @@ public class MRClientService extends AbstractService
return getBindAddress();
}
private Job verifyAndGetJob(JobId jobID,
boolean modifyAccess) throws IOException {
private Job verifyAndGetJob(JobId jobID,
JobACL accessType) throws IOException {
Job job = appContext.getJob(jobID);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
if (!job.checkAccess(ugi, accessType)) {
throw new AccessControlException("User " + ugi.getShortUserName()
+ " cannot perform operation " + accessType.name() + " on "
+ jobID);
}
return job;
}
private Task verifyAndGetTask(TaskId taskID,
boolean modifyAccess) throws IOException {
JobACL accessType) throws IOException {
Task task = verifyAndGetJob(taskID.getJobId(),
modifyAccess).getTask(taskID);
accessType).getTask(taskID);
if (task == null) {
throw new IOException("Unknown Task " + taskID);
}
@ -192,9 +201,9 @@ public class MRClientService extends AbstractService
}
private TaskAttempt verifyAndGetAttempt(TaskAttemptId attemptID,
boolean modifyAccess) throws IOException {
JobACL accessType) throws IOException {
TaskAttempt attempt = verifyAndGetTask(attemptID.getTaskId(),
modifyAccess).getAttempt(attemptID);
accessType).getAttempt(attemptID);
if (attempt == null) {
throw new IOException("Unknown TaskAttempt " + attemptID);
}
@ -205,7 +214,7 @@ public class MRClientService extends AbstractService
public GetCountersResponse getCounters(GetCountersRequest request)
throws IOException {
JobId jobId = request.getJobId();
Job job = verifyAndGetJob(jobId, false);
Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB);
GetCountersResponse response =
recordFactory.newRecordInstance(GetCountersResponse.class);
response.setCounters(TypeConverter.toYarn(job.getAllCounters()));
@ -216,7 +225,7 @@ public class MRClientService extends AbstractService
public GetJobReportResponse getJobReport(GetJobReportRequest request)
throws IOException {
JobId jobId = request.getJobId();
Job job = verifyAndGetJob(jobId, false);
Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB);
GetJobReportResponse response =
recordFactory.newRecordInstance(GetJobReportResponse.class);
if (job != null) {
@ -235,7 +244,7 @@ public class MRClientService extends AbstractService
GetTaskAttemptReportResponse response =
recordFactory.newRecordInstance(GetTaskAttemptReportResponse.class);
response.setTaskAttemptReport(
verifyAndGetAttempt(taskAttemptId, false).getReport());
verifyAndGetAttempt(taskAttemptId, JobACL.VIEW_JOB).getReport());
return response;
}
@ -245,7 +254,8 @@ public class MRClientService extends AbstractService
TaskId taskId = request.getTaskId();
GetTaskReportResponse response =
recordFactory.newRecordInstance(GetTaskReportResponse.class);
response.setTaskReport(verifyAndGetTask(taskId, false).getReport());
response.setTaskReport(
verifyAndGetTask(taskId, JobACL.VIEW_JOB).getReport());
return response;
}
@ -256,7 +266,7 @@ public class MRClientService extends AbstractService
JobId jobId = request.getJobId();
int fromEventId = request.getFromEventId();
int maxEvents = request.getMaxEvents();
Job job = verifyAndGetJob(jobId, false);
Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB);
GetTaskAttemptCompletionEventsResponse response =
recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class);
@ -270,9 +280,11 @@ public class MRClientService extends AbstractService
public KillJobResponse killJob(KillJobRequest request)
throws IOException {
JobId jobId = request.getJobId();
String message = "Kill Job received from client " + jobId;
UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser();
String message = "Kill job " + jobId + " received from " + callerUGI
+ " at " + Server.getRemoteAddress();
LOG.info(message);
verifyAndGetJob(jobId, true);
verifyAndGetJob(jobId, JobACL.MODIFY_JOB);
appContext.getEventHandler().handle(
new JobDiagnosticsUpdateEvent(jobId, message));
appContext.getEventHandler().handle(
@ -287,9 +299,11 @@ public class MRClientService extends AbstractService
public KillTaskResponse killTask(KillTaskRequest request)
throws IOException {
TaskId taskId = request.getTaskId();
String message = "Kill task received from client " + taskId;
UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser();
String message = "Kill task " + taskId + " received from " + callerUGI
+ " at " + Server.getRemoteAddress();
LOG.info(message);
verifyAndGetTask(taskId, true);
verifyAndGetTask(taskId, JobACL.MODIFY_JOB);
appContext.getEventHandler().handle(
new TaskEvent(taskId, TaskEventType.T_KILL));
KillTaskResponse response =
@ -302,9 +316,12 @@ public class MRClientService extends AbstractService
public KillTaskAttemptResponse killTaskAttempt(
KillTaskAttemptRequest request) throws IOException {
TaskAttemptId taskAttemptId = request.getTaskAttemptId();
String message = "Kill task attempt received from client " + taskAttemptId;
UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser();
String message = "Kill task attempt " + taskAttemptId
+ " received from " + callerUGI + " at "
+ Server.getRemoteAddress();
LOG.info(message);
verifyAndGetAttempt(taskAttemptId, true);
verifyAndGetAttempt(taskAttemptId, JobACL.MODIFY_JOB);
appContext.getEventHandler().handle(
new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message));
appContext.getEventHandler().handle(
@ -322,8 +339,8 @@ public class MRClientService extends AbstractService
GetDiagnosticsResponse response =
recordFactory.newRecordInstance(GetDiagnosticsResponse.class);
response.addAllDiagnostics(
verifyAndGetAttempt(taskAttemptId, false).getDiagnostics());
response.addAllDiagnostics(verifyAndGetAttempt(taskAttemptId,
JobACL.VIEW_JOB).getDiagnostics());
return response;
}
@ -332,9 +349,12 @@ public class MRClientService extends AbstractService
public FailTaskAttemptResponse failTaskAttempt(
FailTaskAttemptRequest request) throws IOException {
TaskAttemptId taskAttemptId = request.getTaskAttemptId();
String message = "Fail task attempt received from client " + taskAttemptId;
UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser();
String message = "Fail task attempt " + taskAttemptId
+ " received from " + callerUGI + " at "
+ Server.getRemoteAddress();
LOG.info(message);
verifyAndGetAttempt(taskAttemptId, true);
verifyAndGetAttempt(taskAttemptId, JobACL.MODIFY_JOB);
appContext.getEventHandler().handle(
new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message));
appContext.getEventHandler().handle(
@ -356,7 +376,7 @@ public class MRClientService extends AbstractService
GetTaskReportsResponse response =
recordFactory.newRecordInstance(GetTaskReportsResponse.class);
Job job = verifyAndGetJob(jobId, false);
Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB);
Collection<Task> tasks = job.getTasks(taskType).values();
LOG.info("Getting task report for " + taskType + " " + jobId
+ ". Report-size will be " + tasks.size());

View File

@ -18,13 +18,20 @@
package org.apache.hadoop.mapreduce.v2.app;
import static org.junit.Assert.fail;
import java.security.PrivilegedExceptionAction;
import java.util.Iterator;
import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
@ -32,6 +39,9 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompleti
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskRequest;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@ -51,6 +61,8 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
@ -169,6 +181,79 @@ public class TestMRClientService {
app.waitForState(job, JobState.SUCCEEDED);
}
@Test
public void testViewAclOnlyCannotModify() throws Exception {
final MRAppWithClientService app = new MRAppWithClientService(1, 0, false);
final Configuration conf = new Configuration();
conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
conf.set(MRJobConfig.JOB_ACL_VIEW_JOB, "viewonlyuser");
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator();
Task task = it.next();
app.waitForState(task, TaskState.RUNNING);
TaskAttempt attempt = task.getAttempts().values().iterator().next();
app.waitForState(attempt, TaskAttemptState.RUNNING);
UserGroupInformation viewOnlyUser =
UserGroupInformation.createUserForTesting(
"viewonlyuser", new String[] {});
Assert.assertTrue("viewonlyuser cannot view job",
job.checkAccess(viewOnlyUser, JobACL.VIEW_JOB));
Assert.assertFalse("viewonlyuser can modify job",
job.checkAccess(viewOnlyUser, JobACL.MODIFY_JOB));
MRClientProtocol client = viewOnlyUser.doAs(
new PrivilegedExceptionAction<MRClientProtocol>() {
@Override
public MRClientProtocol run() throws Exception {
YarnRPC rpc = YarnRPC.create(conf);
return (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
app.clientService.getBindAddress(), conf);
}
});
KillJobRequest killJobRequest = recordFactory.newRecordInstance(
KillJobRequest.class);
killJobRequest.setJobId(app.getJobId());
try {
client.killJob(killJobRequest);
fail("viewonlyuser killed job");
} catch (AccessControlException e) {
// pass
}
KillTaskRequest killTaskRequest = recordFactory.newRecordInstance(
KillTaskRequest.class);
killTaskRequest.setTaskId(task.getID());
try {
client.killTask(killTaskRequest);
fail("viewonlyuser killed task");
} catch (AccessControlException e) {
// pass
}
KillTaskAttemptRequest killTaskAttemptRequest =
recordFactory.newRecordInstance(KillTaskAttemptRequest.class);
killTaskAttemptRequest.setTaskAttemptId(attempt.getID());
try {
client.killTaskAttempt(killTaskAttemptRequest);
fail("viewonlyuser killed task attempt");
} catch (AccessControlException e) {
// pass
}
FailTaskAttemptRequest failTaskAttemptRequest =
recordFactory.newRecordInstance(FailTaskAttemptRequest.class);
failTaskAttemptRequest.setTaskAttemptId(attempt.getID());
try {
client.failTaskAttempt(failTaskAttemptRequest);
fail("viewonlyuser killed task attempt");
} catch (AccessControlException e) {
// pass
}
}
private void verifyJobReport(JobReport jr) {
Assert.assertNotNull("JobReport is null", jr);
List<AMInfo> amInfos = jr.getAMInfos();

View File

@ -86,6 +86,9 @@ Release 2.1.1-beta - UNRELEASED
applications together with running apps by default, following up YARN-1074.
(Xuan Gong via vinodkv)
YARN-1065. NM should provide AuxillaryService data to the container (Xuan
Gong via bikas)
OPTIMIZATIONS
BUG FIXES
@ -149,6 +152,10 @@ Release 2.1.1-beta - UNRELEASED
YARN-1077. Fixed TestContainerLaunch test failure on Windows. (Chuan Liu via
vinodkv)
YARN-957. Fixed a bug in CapacityScheduler because of which requests that
need more than a node's total capability were incorrectly allocated on that
node causing apps to hang. (Omkar Vinit Joshi via vinodkv)
Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.util;
import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.commons.codec.binary.Base64;
public class AuxiliaryServiceHelper {
public final static String NM_AUX_SERVICE = "NM_AUX_SERVICE_";
public static ByteBuffer getServiceDataFromEnv(String serviceName,
Map<String, String> env) {
byte[] metaData =
Base64.decodeBase64(env.get(getPrefixServiceName(serviceName)));
return ByteBuffer.wrap(metaData);
}
public static void setServiceDataIntoEnv(String serviceName,
ByteBuffer metaData, Map<String, String> env) {
byte[] byteData = metaData.array();
env.put(getPrefixServiceName(serviceName),
Base64.encodeBase64String(byteData));
}
private static String getPrefixServiceName(String serviceName) {
return NM_AUX_SERVICE + serviceName;
}
}

View File

@ -216,7 +216,7 @@ public class ContainerManagerImpl extends CompositeService implements
protected ContainersLauncher createContainersLauncher(Context context,
ContainerExecutor exec) {
return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler);
return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler, this);
}
@Override
@ -410,7 +410,7 @@ public class ContainerManagerImpl extends CompositeService implements
}
}
return StartContainersResponse.newInstance(auxiliaryServices.getMetaData(),
return StartContainersResponse.newInstance(getAuxServiceMetaData(),
succeededContainers, failedContainers);
}
@ -759,4 +759,7 @@ public class ContainerManagerImpl extends CompositeService implements
return this.context;
}
public Map<String, ByteBuffer> getAuxServiceMetaData() {
return this.auxiliaryServices.getMetaData();
}
}

View File

@ -26,6 +26,7 @@ import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
@ -60,6 +61,7 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
@ -70,6 +72,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
import org.apache.hadoop.yarn.util.ConverterUtils;
public class ContainerLaunch implements Callable<Integer> {
@ -88,6 +91,7 @@ public class ContainerLaunch implements Callable<Integer> {
private final Container container;
private final Configuration conf;
private final Context context;
private final ContainerManagerImpl containerManager;
private volatile AtomicBoolean shouldLaunchContainer = new AtomicBoolean(false);
private volatile AtomicBoolean completed = new AtomicBoolean(false);
@ -101,7 +105,8 @@ public class ContainerLaunch implements Callable<Integer> {
public ContainerLaunch(Context context, Configuration configuration,
Dispatcher dispatcher, ContainerExecutor exec, Application app,
Container container, LocalDirsHandlerService dirsHandler) {
Container container, LocalDirsHandlerService dirsHandler,
ContainerManagerImpl containerManager) {
this.context = context;
this.conf = configuration;
this.app = app;
@ -109,6 +114,7 @@ public class ContainerLaunch implements Callable<Integer> {
this.container = container;
this.dispatcher = dispatcher;
this.dirsHandler = dirsHandler;
this.containerManager = containerManager;
this.sleepDelayBeforeSigKill =
conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS);
@ -227,7 +233,6 @@ public class ContainerLaunch implements Callable<Integer> {
ApplicationConstants.CONTAINER_TOKEN_FILE_ENV_NAME,
new Path(containerWorkDir,
FINAL_CONTAINER_TOKENS_FILE).toUri().getPath());
// Sanitize the container's environment
sanitizeEnv(environment, containerWorkDir, appDirs, containerLogDirs,
localResources);
@ -680,6 +685,12 @@ public class ContainerLaunch implements Callable<Integer> {
environment.put(Environment.CLASSPATH.name(), classPathJar);
}
}
// put AuxiliaryService data to environment
for (Map.Entry<String, ByteBuffer> meta : containerManager
.getAuxServiceMetaData().entrySet()) {
AuxiliaryServiceHelper.setServiceDataIntoEnv(
meta.getKey(), meta.getValue(), environment);
}
}
static void writeLaunchEnv(OutputStream out,

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
@ -65,6 +66,7 @@ public class ContainersLauncher extends AbstractService
private final Context context;
private final ContainerExecutor exec;
private final Dispatcher dispatcher;
private final ContainerManagerImpl containerManager;
private LocalDirsHandlerService dirsHandler;
@VisibleForTesting
@ -89,12 +91,14 @@ public class ContainersLauncher extends AbstractService
public ContainersLauncher(Context context, Dispatcher dispatcher,
ContainerExecutor exec, LocalDirsHandlerService dirsHandler) {
ContainerExecutor exec, LocalDirsHandlerService dirsHandler,
ContainerManagerImpl containerManager) {
super("containers-launcher");
this.exec = exec;
this.context = context;
this.dispatcher = dispatcher;
this.dirsHandler = dirsHandler;
this.containerManager = containerManager;
}
@Override
@ -128,7 +132,7 @@ public class ContainersLauncher extends AbstractService
ContainerLaunch launch =
new ContainerLaunch(context, getConfig(), dispatcher, exec, app,
event.getContainer(), dirsHandler);
event.getContainer(), dirsHandler, containerManager);
running.put(containerId,
new RunningContainer(containerLauncher.submit(launch),
launch));

View File

@ -145,7 +145,7 @@ public class DummyContainerManager extends ContainerManagerImpl {
protected ContainersLauncher createContainersLauncher(Context context,
ContainerExecutor exec) {
return new ContainersLauncher(context, super.dispatcher, exec,
super.dirsHandler) {
super.dirsHandler, this) {
@Override
public void handle(ContainersLauncherEvent event) {
Container container = event.getContainer();

View File

@ -20,8 +20,11 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import junit.framework.Assert;
@ -211,6 +214,16 @@ public abstract class BaseContainerManagerTest {
NMTokenIdentifier nmTokenIdentifier) throws InvalidToken {
// Do nothing
}
@Override
public Map<String, ByteBuffer> getAuxServiceMetaData() {
Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
serviceData.put("AuxService1",
ByteBuffer.wrap("AuxServiceMetaData1".getBytes()));
serviceData.put("AuxService2",
ByteBuffer.wrap("AuxServiceMetaData2".getBytes()));
return serviceData;
}
};
}

View File

@ -650,7 +650,7 @@ public class TestContainer {
Context context = mock(Context.class);
when(context.getApplications()).thenReturn(
new ConcurrentHashMap<ApplicationId, Application>());
launcher = new ContainersLauncher(context, dispatcher, null, null);
launcher = new ContainersLauncher(context, dispatcher, null, null, null);
// create a mock ExecutorService, which will not really launch
// ContainerLaunch at all.
launcher.containerLauncher = mock(ExecutorService.class);

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;
import java.io.BufferedReader;
import java.io.File;
@ -28,6 +29,7 @@ import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -37,6 +39,7 @@ import java.util.Map;
import junit.framework.Assert;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
@ -70,11 +73,13 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
@ -381,6 +386,12 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
+ processStartFile);
fileWriter.println("@echo " + Environment.HOME.$() + ">> "
+ processStartFile);
for (String serviceName : containerManager.getAuxServiceMetaData()
.keySet()) {
fileWriter.println("@echo" + AuxiliaryServiceHelper.NM_AUX_SERVICE
+ serviceName + " >> "
+ processStartFile);
}
fileWriter.println("@echo " + cId + ">> " + processStartFile);
fileWriter.println("@ping -n 100 127.0.0.1 >nul");
} else {
@ -403,6 +414,12 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
+ processStartFile);
fileWriter.write("\necho $" + Environment.HOME.name() + " >> "
+ processStartFile);
for (String serviceName : containerManager.getAuxServiceMetaData()
.keySet()) {
fileWriter.write("\necho $" + AuxiliaryServiceHelper.NM_AUX_SERVICE
+ serviceName + " >> "
+ processStartFile);
}
fileWriter.write("\necho $$ >> " + processStartFile);
fileWriter.write("\nexec sleep 100");
}
@ -487,6 +504,12 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
YarnConfiguration.DEFAULT_NM_USER_HOME_DIR),
reader.readLine());
for (String serviceName : containerManager.getAuxServiceMetaData().keySet()) {
Assert.assertEquals(
containerManager.getAuxServiceMetaData().get(serviceName),
ByteBuffer.wrap(Base64.decodeBase64(reader.readLine().getBytes())));
}
Assert.assertEquals(cId.toString(), containerLaunchContext
.getEnvironment().get(Environment.CONTAINER_ID.name()));
Assert.assertEquals(context.getNodeId().getHost(), containerLaunchContext
@ -557,6 +580,16 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
DefaultContainerExecutor.containerIsAlive(pid));
}
@Test (timeout = 5000)
public void testAuxiliaryServiceHelper() throws Exception {
Map<String, String> env = new HashMap<String, String>();
String serviceName = "testAuxiliaryService";
ByteBuffer bb = ByteBuffer.wrap("testAuxiliaryService".getBytes());
AuxiliaryServiceHelper.setServiceDataIntoEnv(serviceName, bb, env);
Assert.assertEquals(bb,
AuxiliaryServiceHelper.getServiceDataFromEnv(serviceName, env));
}
@Test
public void testDelayedKill() throws Exception {
containerManager.start();
@ -703,7 +736,7 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
};
when(dispatcher.getEventHandler()).thenReturn(eventHandler);
ContainerLaunch launch = new ContainerLaunch(context, new Configuration(),
dispatcher, exec, null, container, dirsHandler);
dispatcher, exec, null, container, dirsHandler, containerManager);
launch.call();
}

View File

@ -67,4 +67,9 @@ public abstract class SchedulerNode {
*/
public abstract int getNumContainers();
/**
* Get total resources on the node.
* @return total resources on the node.
*/
public abstract Resource getTotalResource();
}

View File

@ -1308,9 +1308,15 @@ public class LeafQueue implements CSQueue {
+ " request=" + request + " type=" + type);
}
Resource capability = request.getCapability();
Resource available = node.getAvailableResource();
Resource totalResource = node.getTotalResource();
if (!Resources.fitsIn(capability, totalResource)) {
LOG.warn("Node : " + node.getNodeID()
+ " does not have sufficient resource for request : " + request
+ " node total capability : " + node.getTotalResource());
return Resources.none();
}
assert Resources.greaterThan(
resourceCalculator, clusterResource, available, Resources.none());

View File

@ -49,6 +49,7 @@ public class FiCaSchedulerNode extends SchedulerNode {
private Resource availableResource = recordFactory.newRecordInstance(Resource.class);
private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
private Resource totalResourceCapability;
private volatile int numContainers;
@ -65,6 +66,9 @@ public class FiCaSchedulerNode extends SchedulerNode {
this.rmNode = node;
this.availableResource.setMemory(node.getTotalCapability().getMemory());
this.availableResource.setVirtualCores(node.getTotalCapability().getVirtualCores());
totalResourceCapability =
Resource.newInstance(node.getTotalCapability().getMemory(), node
.getTotalCapability().getVirtualCores());
if (usePortForNodeName) {
nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort();
} else {
@ -126,6 +130,11 @@ public class FiCaSchedulerNode extends SchedulerNode {
return this.usedResource;
}
@Override
public Resource getTotalResource() {
return this.totalResourceCapability;
}
private synchronized boolean isValidContainer(Container c) {
if (launchedContainers.containsKey(c.getId()))
return true;

View File

@ -52,6 +52,7 @@ public class FSSchedulerNode extends SchedulerNode {
private Resource availableResource;
private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
private Resource totalResourceCapability;
private volatile int numContainers;
@ -68,6 +69,9 @@ public class FSSchedulerNode extends SchedulerNode {
public FSSchedulerNode(RMNode node, boolean usePortForNodeName) {
this.rmNode = node;
this.availableResource = Resources.clone(node.getTotalCapability());
totalResourceCapability =
Resource.newInstance(node.getTotalCapability().getMemory(), node
.getTotalCapability().getVirtualCores());
if (usePortForNodeName) {
nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort();
} else {
@ -173,6 +177,11 @@ public class FSSchedulerNode extends SchedulerNode {
Resources.subtractFrom(usedResource, resource);
}
@Override
public Resource getTotalResource() {
return this.totalResourceCapability;
}
private synchronized void deductAvailableResource(Resource resource) {
if (resource == null) {
LOG.error("Invalid deduction of null resource for "

View File

@ -232,6 +232,14 @@ public class MockRM extends ResourceManager {
return nm;
}
public MockNM registerNode(String nodeIdStr, int memory, int vCores)
throws Exception {
MockNM nm =
new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService());
nm.registerNode();
return nm;
}
public void sendNodeStarted(MockNM nm) throws Exception {
RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(
nm.getNodeId());

View File

@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.TestFifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.junit.Test;
public class TestContainerAllocation {
private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
private final int GB = 1024;
@Test(timeout = 3000000)
public void testExcessReservationThanNodeManagerCapacity() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
MockRM rm = new MockRM(conf);
rm.start();
// Register node1
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 2 * GB, 4);
MockNM nm2 = rm.registerNode("127.0.0.1:2234", 3 * GB, 4);
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
// wait..
int waitCount = 20;
int size = rm.getRMContext().getRMNodes().size();
while ((size = rm.getRMContext().getRMNodes().size()) != 2
&& waitCount-- > 0) {
LOG.info("Waiting for node managers to register : " + size);
Thread.sleep(100);
}
Assert.assertEquals(2, rm.getRMContext().getRMNodes().size());
// Submit an application
RMApp app1 = rm.submitApp(128);
// kick the scheduling
nm1.nodeHeartbeat(true);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
am1.registerAppAttempt();
LOG.info("sending container requests ");
am1.addRequests(new String[] {"*"}, 3 * GB, 1, 1);
AllocateResponse alloc1Response = am1.schedule(); // send the request
// kick the scheduler
nm1.nodeHeartbeat(true);
int waitCounter = 20;
LOG.info("heartbeating nm1");
while (alloc1Response.getAllocatedContainers().size() < 1
&& waitCounter-- > 0) {
LOG.info("Waiting for containers to be created for app 1...");
Thread.sleep(500);
alloc1Response = am1.schedule();
}
LOG.info("received container : "
+ alloc1Response.getAllocatedContainers().size());
// No container should be allocated.
// Internally it should not been reserved.
Assert.assertTrue(alloc1Response.getAllocatedContainers().size() == 0);
LOG.info("heartbeating nm2");
waitCounter = 20;
nm2.nodeHeartbeat(true);
while (alloc1Response.getAllocatedContainers().size() < 1
&& waitCounter-- > 0) {
LOG.info("Waiting for containers to be created for app 1...");
Thread.sleep(500);
alloc1Response = am1.schedule();
}
LOG.info("received container : "
+ alloc1Response.getAllocatedContainers().size());
Assert.assertTrue(alloc1Response.getAllocatedContainers().size() == 1);
rm.stop();
}
}