Merge r1470760 through r1471228 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1471229 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2013-04-24 02:21:58 +00:00
commit 5f1e3b561a
56 changed files with 999 additions and 418 deletions

View File

@ -326,6 +326,10 @@ Trunk (Unreleased)
HDFS-4646. createNNProxyWithClientProtocol ignores configured timeout HDFS-4646. createNNProxyWithClientProtocol ignores configured timeout
value (Jagane Sundar via cos) value (Jagane Sundar via cos)
HDFS-4732. Fix TestDFSUpgradeFromImage which fails on Windows due to
failure to unpack old image tarball that contains hard links.
(Chris Nauroth via szetszwo)
BREAKDOWN OF HADOOP-8562 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HADOOP-8562 SUBTASKS AND RELATED JIRAS
HDFS-4145. Merge hdfs cmd line scripts from branch-1-win. (David Lao, HDFS-4145. Merge hdfs cmd line scripts from branch-1-win. (David Lao,
@ -360,6 +364,9 @@ Trunk (Unreleased)
HDFS-4674. TestBPOfferService fails on Windows due to failure parsing HDFS-4674. TestBPOfferService fails on Windows due to failure parsing
datanode data directory as URI. (Chris Nauroth via suresh) datanode data directory as URI. (Chris Nauroth via suresh)
HDFS-4725. Fix HDFS file handle leaks in FSEditLog, NameNode,
OfflineEditsBinaryLoader and some tests. (Chris Nauroth via szetszwo)
BREAKDOWN OF HDFS-347 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-347 SUBTASKS AND RELATED JIRAS
HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes. HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes.
@ -573,6 +580,8 @@ Release 2.0.5-beta - UNRELEASED
HDFS-4695. TestEditLog leaks open file handles between tests. HDFS-4695. TestEditLog leaks open file handles between tests.
(Ivan Mitic via suresh) (Ivan Mitic via suresh)
HDFS-4737. JVM path embedded in fuse binaries. (Sean Mackrory via atm)
Release 2.0.4-alpha - UNRELEASED Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -322,21 +322,23 @@ public class FSEditLog implements LogsPurgeable {
LOG.debug("Closing log when already closed"); LOG.debug("Closing log when already closed");
return; return;
} }
if (state == State.IN_SEGMENT) {
assert editLogStream != null;
waitForSyncToFinish();
endCurrentLogSegment(true);
}
if (journalSet != null && !journalSet.isEmpty()) {
try {
journalSet.close();
} catch (IOException ioe) {
LOG.warn("Error closing journalSet", ioe);
}
}
state = State.CLOSED; try {
if (state == State.IN_SEGMENT) {
assert editLogStream != null;
waitForSyncToFinish();
endCurrentLogSegment(true);
}
} finally {
if (journalSet != null && !journalSet.isEmpty()) {
try {
journalSet.close();
} catch (IOException ioe) {
LOG.warn("Error closing journalSet", ioe);
}
}
state = State.CLOSED;
}
} }
@ -588,6 +590,7 @@ public class FSEditLog implements LogsPurgeable {
"due to " + e.getMessage() + ". " + "due to " + e.getMessage() + ". " +
"Unsynced transactions: " + (txid - synctxid); "Unsynced transactions: " + (txid - synctxid);
LOG.fatal(msg, new Exception()); LOG.fatal(msg, new Exception());
IOUtils.cleanup(LOG, journalSet);
terminate(1, msg); terminate(1, msg);
} }
} finally { } finally {
@ -611,6 +614,7 @@ public class FSEditLog implements LogsPurgeable {
"Could not sync enough journals to persistent storage. " "Could not sync enough journals to persistent storage. "
+ "Unsynced transactions: " + (txid - synctxid); + "Unsynced transactions: " + (txid - synctxid);
LOG.fatal(msg, new Exception()); LOG.fatal(msg, new Exception());
IOUtils.cleanup(LOG, journalSet);
terminate(1, msg); terminate(1, msg);
} }
} }

View File

@ -651,13 +651,14 @@ public class NameNode {
} }
} catch (ServiceFailedException e) { } catch (ServiceFailedException e) {
LOG.warn("Encountered exception while exiting state ", e); LOG.warn("Encountered exception while exiting state ", e);
} } finally {
stopCommonServices(); stopCommonServices();
if (metrics != null) { if (metrics != null) {
metrics.shutdown(); metrics.shutdown();
} }
if (namesystem != null) { if (namesystem != null) {
namesystem.shutdown(); namesystem.shutdown();
}
} }
} }

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.tools.offlineEditsViewer.OfflineEditsViewer;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.io.IOUtils;
/** /**
* OfflineEditsBinaryLoader loads edits from a binary edits file * OfflineEditsBinaryLoader loads edits from a binary edits file
@ -59,43 +60,49 @@ class OfflineEditsBinaryLoader implements OfflineEditsLoader {
*/ */
@Override @Override
public void loadEdits() throws IOException { public void loadEdits() throws IOException {
visitor.start(inputStream.getVersion()); try {
while (true) { visitor.start(inputStream.getVersion());
try { while (true) {
FSEditLogOp op = inputStream.readOp(); try {
if (op == null) FSEditLogOp op = inputStream.readOp();
break; if (op == null)
if (fixTxIds) { break;
if (nextTxId <= 0) { if (fixTxIds) {
nextTxId = op.getTransactionId();
if (nextTxId <= 0) { if (nextTxId <= 0) {
nextTxId = 1; nextTxId = op.getTransactionId();
if (nextTxId <= 0) {
nextTxId = 1;
}
} }
op.setTransactionId(nextTxId);
nextTxId++;
} }
op.setTransactionId(nextTxId); visitor.visitOp(op);
nextTxId++; } catch (IOException e) {
if (!recoveryMode) {
// Tell the visitor to clean up, then re-throw the exception
LOG.error("Got IOException at position " +
inputStream.getPosition());
visitor.close(e);
throw e;
}
LOG.error("Got IOException while reading stream! Resyncing.", e);
inputStream.resync();
} catch (RuntimeException e) {
if (!recoveryMode) {
// Tell the visitor to clean up, then re-throw the exception
LOG.error("Got RuntimeException at position " +
inputStream.getPosition());
visitor.close(e);
throw e;
}
LOG.error("Got RuntimeException while reading stream! Resyncing.", e);
inputStream.resync();
} }
visitor.visitOp(op);
} catch (IOException e) {
if (!recoveryMode) {
// Tell the visitor to clean up, then re-throw the exception
LOG.error("Got IOException at position " + inputStream.getPosition());
visitor.close(e);
throw e;
}
LOG.error("Got IOException while reading stream! Resyncing.", e);
inputStream.resync();
} catch (RuntimeException e) {
if (!recoveryMode) {
// Tell the visitor to clean up, then re-throw the exception
LOG.error("Got RuntimeException at position " + inputStream.getPosition());
visitor.close(e);
throw e;
}
LOG.error("Got RuntimeException while reading stream! Resyncing.", e);
inputStream.resync();
} }
visitor.close(null);
} finally {
IOUtils.cleanup(LOG, inputStream);
} }
visitor.close(null);
} }
} }

View File

@ -16,6 +16,8 @@
# limitations under the License. # limitations under the License.
# #
set(CMAKE_SKIP_RPATH TRUE)
# Find Linux FUSE # Find Linux FUSE
IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux") IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
find_package(PkgConfig REQUIRED) find_package(PkgConfig REQUIRED)

View File

@ -48,6 +48,8 @@ import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
@ -91,6 +93,8 @@ import com.google.common.base.Joiner;
/** Utilities for HDFS tests */ /** Utilities for HDFS tests */
public class DFSTestUtil { public class DFSTestUtil {
private static final Log LOG = LogFactory.getLog(DFSTestUtil.class);
private static Random gen = new Random(); private static Random gen = new Random();
private static String[] dirNames = { private static String[] dirNames = {
@ -742,7 +746,11 @@ public class DFSTestUtil {
File file = new File(filename); File file = new File(filename);
DataInputStream in = new DataInputStream(new FileInputStream(file)); DataInputStream in = new DataInputStream(new FileInputStream(file));
byte[] content = new byte[(int)file.length()]; byte[] content = new byte[(int)file.length()];
in.readFully(content); try {
in.readFully(content);
} finally {
IOUtils.cleanup(LOG, in);
}
return content; return content;
} }

View File

@ -252,8 +252,9 @@ public class TestDFSUpgradeFromImage {
// Now try to start an NN from it // Now try to start an NN from it
MiniDFSCluster cluster = null;
try { try {
new MiniDFSCluster.Builder(conf).numDataNodes(0) cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.format(false) .format(false)
.manageDataDfsDirs(false) .manageDataDfsDirs(false)
.manageNameDfsDirs(false) .manageNameDfsDirs(false)
@ -264,6 +265,11 @@ public class TestDFSUpgradeFromImage {
if (!ioe.toString().contains("Old layout version is 'too old'")) { if (!ioe.toString().contains("Old layout version is 'too old'")) {
throw ioe; throw ioe;
} }
} finally {
// We expect startup to fail, but just in case it didn't, shutdown now.
if (cluster != null) {
cluster.shutdown();
}
} }
} }

View File

@ -631,44 +631,48 @@ public class TestDistributedFileSystem {
true); true);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(2).build(); .numDataNodes(2).build();
DistributedFileSystem fs = cluster.getFileSystem(); try {
// Create two files DistributedFileSystem fs = cluster.getFileSystem();
Path tmpFile1 = new Path("/tmpfile1.dat"); // Create two files
Path tmpFile2 = new Path("/tmpfile2.dat"); Path tmpFile1 = new Path("/tmpfile1.dat");
DFSTestUtil.createFile(fs, tmpFile1, 1024, (short) 2, 0xDEADDEADl); Path tmpFile2 = new Path("/tmpfile2.dat");
DFSTestUtil.createFile(fs, tmpFile2, 1024, (short) 2, 0xDEADDEADl); DFSTestUtil.createFile(fs, tmpFile1, 1024, (short) 2, 0xDEADDEADl);
// Get locations of blocks of both files and concat together DFSTestUtil.createFile(fs, tmpFile2, 1024, (short) 2, 0xDEADDEADl);
BlockLocation[] blockLocs1 = fs.getFileBlockLocations(tmpFile1, 0, 1024); // Get locations of blocks of both files and concat together
BlockLocation[] blockLocs2 = fs.getFileBlockLocations(tmpFile2, 0, 1024); BlockLocation[] blockLocs1 = fs.getFileBlockLocations(tmpFile1, 0, 1024);
BlockLocation[] blockLocs = (BlockLocation[]) ArrayUtils.addAll(blockLocs1, BlockLocation[] blockLocs2 = fs.getFileBlockLocations(tmpFile2, 0, 1024);
blockLocs2); BlockLocation[] blockLocs = (BlockLocation[]) ArrayUtils.addAll(blockLocs1,
// Fetch VolumeBlockLocations in batch blockLocs2);
BlockStorageLocation[] locs = fs.getFileBlockStorageLocations(Arrays // Fetch VolumeBlockLocations in batch
.asList(blockLocs)); BlockStorageLocation[] locs = fs.getFileBlockStorageLocations(Arrays
int counter = 0; .asList(blockLocs));
// Print out the list of ids received for each block int counter = 0;
for (BlockStorageLocation l : locs) { // Print out the list of ids received for each block
for (int i = 0; i < l.getVolumeIds().length; i++) { for (BlockStorageLocation l : locs) {
VolumeId id = l.getVolumeIds()[i]; for (int i = 0; i < l.getVolumeIds().length; i++) {
String name = l.getNames()[i]; VolumeId id = l.getVolumeIds()[i];
if (id != null) { String name = l.getNames()[i];
System.out.println("Datanode " + name + " has block " + counter if (id != null) {
+ " on volume id " + id.toString()); System.out.println("Datanode " + name + " has block " + counter
+ " on volume id " + id.toString());
}
}
counter++;
}
assertEquals("Expected two HdfsBlockLocations for two 1-block files", 2,
locs.length);
for (BlockStorageLocation l : locs) {
assertEquals("Expected two replicas for each block", 2,
l.getVolumeIds().length);
for (int i = 0; i < l.getVolumeIds().length; i++) {
VolumeId id = l.getVolumeIds()[i];
String name = l.getNames()[i];
assertTrue("Expected block to be valid on datanode " + name,
id.isValid());
} }
} }
counter++; } finally {
} cluster.shutdown();
assertEquals("Expected two HdfsBlockLocations for two 1-block files", 2,
locs.length);
for (BlockStorageLocation l : locs) {
assertEquals("Expected two replicas for each block", 2,
l.getVolumeIds().length);
for (int i = 0; i < l.getVolumeIds().length; i++) {
VolumeId id = l.getVolumeIds()[i];
String name = l.getNames()[i];
assertTrue("Expected block to be valid on datanode " + name,
id.isValid());
}
} }
} }
@ -683,27 +687,31 @@ public class TestDistributedFileSystem {
true); true);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(2).build(); .numDataNodes(2).build();
cluster.getDataNodes(); try {
DistributedFileSystem fs = cluster.getFileSystem(); cluster.getDataNodes();
// Create a file DistributedFileSystem fs = cluster.getFileSystem();
Path tmpFile = new Path("/tmpfile1.dat"); // Create a file
DFSTestUtil.createFile(fs, tmpFile, 1024, (short) 2, 0xDEADDEADl); Path tmpFile = new Path("/tmpfile1.dat");
// Get locations of blocks of the file DFSTestUtil.createFile(fs, tmpFile, 1024, (short) 2, 0xDEADDEADl);
BlockLocation[] blockLocs = fs.getFileBlockLocations(tmpFile, 0, 1024); // Get locations of blocks of the file
// Stop a datanode to simulate a failure BlockLocation[] blockLocs = fs.getFileBlockLocations(tmpFile, 0, 1024);
cluster.stopDataNode(0); // Stop a datanode to simulate a failure
// Fetch VolumeBlockLocations cluster.stopDataNode(0);
BlockStorageLocation[] locs = fs.getFileBlockStorageLocations(Arrays // Fetch VolumeBlockLocations
.asList(blockLocs)); BlockStorageLocation[] locs = fs.getFileBlockStorageLocations(Arrays
.asList(blockLocs));
assertEquals("Expected one HdfsBlockLocation for one 1-block file", 1, assertEquals("Expected one HdfsBlockLocation for one 1-block file", 1,
locs.length); locs.length);
for (BlockStorageLocation l : locs) { for (BlockStorageLocation l : locs) {
assertEquals("Expected two replicas for each block", 2, assertEquals("Expected two replicas for each block", 2,
l.getVolumeIds().length); l.getVolumeIds().length);
assertTrue("Expected one valid and one invalid replica", assertTrue("Expected one valid and one invalid replica",
(l.getVolumeIds()[0].isValid()) ^ (l.getVolumeIds()[1].isValid())); (l.getVolumeIds()[0].isValid()) ^ (l.getVolumeIds()[1].isValid()));
}
} finally {
cluster.shutdown();
} }
} }

View File

@ -781,49 +781,53 @@ public class TestQuota {
public void testMaxSpaceQuotas() throws Exception { public void testMaxSpaceQuotas() throws Exception {
final Configuration conf = new HdfsConfiguration(); final Configuration conf = new HdfsConfiguration();
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
final FileSystem fs = cluster.getFileSystem();
assertTrue("Not a HDFS: "+fs.getUri(),
fs instanceof DistributedFileSystem);
final DistributedFileSystem dfs = (DistributedFileSystem)fs;
// create test directory
final Path testFolder = new Path("/testFolder");
assertTrue(dfs.mkdirs(testFolder));
// setting namespace quota to Long.MAX_VALUE - 1 should work
dfs.setQuota(testFolder, Long.MAX_VALUE - 1, 10);
ContentSummary c = dfs.getContentSummary(testFolder);
assertTrue("Quota not set properly", c.getQuota() == Long.MAX_VALUE - 1);
// setting diskspace quota to Long.MAX_VALUE - 1 should work
dfs.setQuota(testFolder, 10, Long.MAX_VALUE - 1);
c = dfs.getContentSummary(testFolder);
assertTrue("Quota not set properly", c.getSpaceQuota() == Long.MAX_VALUE - 1);
// setting namespace quota to Long.MAX_VALUE should not work + no error
dfs.setQuota(testFolder, Long.MAX_VALUE, 10);
c = dfs.getContentSummary(testFolder);
assertTrue("Quota should not have changed", c.getQuota() == 10);
// setting diskspace quota to Long.MAX_VALUE should not work + no error
dfs.setQuota(testFolder, 10, Long.MAX_VALUE);
c = dfs.getContentSummary(testFolder);
assertTrue("Quota should not have changed", c.getSpaceQuota() == 10);
// setting namespace quota to Long.MAX_VALUE + 1 should not work + error
try { try {
dfs.setQuota(testFolder, Long.MAX_VALUE + 1, 10); final FileSystem fs = cluster.getFileSystem();
fail("Exception not thrown"); assertTrue("Not a HDFS: "+fs.getUri(),
} catch (IllegalArgumentException e) { fs instanceof DistributedFileSystem);
// Expected final DistributedFileSystem dfs = (DistributedFileSystem)fs;
}
// setting diskspace quota to Long.MAX_VALUE + 1 should not work + error // create test directory
try { final Path testFolder = new Path("/testFolder");
dfs.setQuota(testFolder, 10, Long.MAX_VALUE + 1); assertTrue(dfs.mkdirs(testFolder));
fail("Exception not thrown");
} catch (IllegalArgumentException e) { // setting namespace quota to Long.MAX_VALUE - 1 should work
// Expected dfs.setQuota(testFolder, Long.MAX_VALUE - 1, 10);
ContentSummary c = dfs.getContentSummary(testFolder);
assertTrue("Quota not set properly", c.getQuota() == Long.MAX_VALUE - 1);
// setting diskspace quota to Long.MAX_VALUE - 1 should work
dfs.setQuota(testFolder, 10, Long.MAX_VALUE - 1);
c = dfs.getContentSummary(testFolder);
assertTrue("Quota not set properly", c.getSpaceQuota() == Long.MAX_VALUE - 1);
// setting namespace quota to Long.MAX_VALUE should not work + no error
dfs.setQuota(testFolder, Long.MAX_VALUE, 10);
c = dfs.getContentSummary(testFolder);
assertTrue("Quota should not have changed", c.getQuota() == 10);
// setting diskspace quota to Long.MAX_VALUE should not work + no error
dfs.setQuota(testFolder, 10, Long.MAX_VALUE);
c = dfs.getContentSummary(testFolder);
assertTrue("Quota should not have changed", c.getSpaceQuota() == 10);
// setting namespace quota to Long.MAX_VALUE + 1 should not work + error
try {
dfs.setQuota(testFolder, Long.MAX_VALUE + 1, 10);
fail("Exception not thrown");
} catch (IllegalArgumentException e) {
// Expected
}
// setting diskspace quota to Long.MAX_VALUE + 1 should not work + error
try {
dfs.setQuota(testFolder, 10, Long.MAX_VALUE + 1);
fail("Exception not thrown");
} catch (IllegalArgumentException e) {
// Expected
}
} finally {
cluster.shutdown();
} }
} }

View File

@ -255,7 +255,6 @@ public class TestEditLogJournalFailures {
doThrow(new IOException("fail on setReadyToFlush()")).when(spyElos) doThrow(new IOException("fail on setReadyToFlush()")).when(spyElos)
.setReadyToFlush(); .setReadyToFlush();
} }
doNothing().when(spyElos).abort();
} }
private EditLogFileOutputStream spyOnStream(JournalAndStream jas) { private EditLogFileOutputStream spyOnStream(JournalAndStream jas) {

View File

@ -530,7 +530,11 @@ public class TestStartup {
.manageDataDfsDirs(false) .manageDataDfsDirs(false)
.manageNameDfsDirs(false) .manageNameDfsDirs(false)
.build(); .build();
cluster.waitActive(); try {
cluster.waitActive();
} finally {
cluster.shutdown();
}
} }
/** /**

View File

@ -206,6 +206,9 @@ Release 2.0.5-beta - UNRELEASED
instead of extracting and populating information itself to start any instead of extracting and populating information itself to start any
container. (vinodkv) container. (vinodkv)
MAPREDUCE-5175. Updated MR App to not set envs that will be set by NMs
anyways after YARN-561. (Xuan Gong via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
MAPREDUCE-4974. Optimising the LineRecordReader initialize() method MAPREDUCE-4974. Optimising the LineRecordReader initialize() method

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.AbstractService;
/** /**
@ -280,7 +281,7 @@ public class LocalContainerLauncher extends AbstractService implements
// Use the AM's local dir env to generate the intermediate step // Use the AM's local dir env to generate the intermediate step
// output files // output files
String[] localSysDirs = StringUtils.getTrimmedStrings( String[] localSysDirs = StringUtils.getTrimmedStrings(
System.getenv(ApplicationConstants.LOCAL_DIR_ENV)); System.getenv(Environment.LOCAL_DIRS.name()));
conf.setStrings(MRConfig.LOCAL_DIR, localSysDirs); conf.setStrings(MRConfig.LOCAL_DIR, localSysDirs);
LOG.info(MRConfig.LOCAL_DIR + " for uber task: " LOG.info(MRConfig.LOCAL_DIR + " for uber task: "
+ conf.get(MRConfig.LOCAL_DIR)); + conf.get(MRConfig.LOCAL_DIR));

View File

@ -111,8 +111,6 @@ public class MapReduceChildJVM {
MRJobConfig.STDERR_LOGFILE_ENV, MRJobConfig.STDERR_LOGFILE_ENV,
getTaskLogFile(TaskLog.LogName.STDERR) getTaskLogFile(TaskLog.LogName.STDERR)
); );
environment.put(MRJobConfig.APPLICATION_ATTEMPT_ID_ENV,
conf.get(MRJobConfig.APPLICATION_ATTEMPT_ID).toString());
} }
private static String getChildJavaOpts(JobConf jobConf, boolean isMapTask) { private static String getChildJavaOpts(JobConf jobConf, boolean isMapTask) {

View File

@ -57,6 +57,9 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
/** /**
@ -216,7 +219,7 @@ class YarnChild {
*/ */
private static void configureLocalDirs(Task task, JobConf job) throws IOException { private static void configureLocalDirs(Task task, JobConf job) throws IOException {
String[] localSysDirs = StringUtils.getTrimmedStrings( String[] localSysDirs = StringUtils.getTrimmedStrings(
System.getenv(ApplicationConstants.LOCAL_DIR_ENV)); System.getenv(Environment.LOCAL_DIRS.name()));
job.setStrings(MRConfig.LOCAL_DIR, localSysDirs); job.setStrings(MRConfig.LOCAL_DIR, localSysDirs);
LOG.info(MRConfig.LOCAL_DIR + " for child: " + job.get(MRConfig.LOCAL_DIR)); LOG.info(MRConfig.LOCAL_DIR + " for child: " + job.get(MRConfig.LOCAL_DIR));
LocalDirAllocator lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR); LocalDirAllocator lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
@ -256,12 +259,14 @@ class YarnChild {
final JobConf job = new JobConf(MRJobConfig.JOB_CONF_FILE); final JobConf job = new JobConf(MRJobConfig.JOB_CONF_FILE);
job.setCredentials(credentials); job.setCredentials(credentials);
String appAttemptIdEnv = System ApplicationAttemptId appAttemptId =
.getenv(MRJobConfig.APPLICATION_ATTEMPT_ID_ENV); ConverterUtils.toContainerId(
LOG.debug("APPLICATION_ATTEMPT_ID: " + appAttemptIdEnv); System.getenv(Environment.CONTAINER_ID.name()))
.getApplicationAttemptId();
LOG.debug("APPLICATION_ATTEMPT_ID: " + appAttemptId);
// Set it in conf, so as to be able to be used the the OutputCommitter. // Set it in conf, so as to be able to be used the the OutputCommitter.
job.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, Integer job.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
.parseInt(appAttemptIdEnv)); appAttemptId.getAttemptId());
// set tcp nodelay // set tcp nodelay
job.setBoolean("ipc.client.tcpnodelay", true); job.setBoolean("ipc.client.tcpnodelay", true);

View File

@ -116,6 +116,7 @@ import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
@ -1270,22 +1271,22 @@ public class MRAppMaster extends CompositeService {
try { try {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
String containerIdStr = String containerIdStr =
System.getenv(ApplicationConstants.AM_CONTAINER_ID_ENV); System.getenv(Environment.CONTAINER_ID.name());
String nodeHostString = System.getenv(ApplicationConstants.NM_HOST_ENV); String nodeHostString = System.getenv(Environment.NM_HOST.name());
String nodePortString = System.getenv(ApplicationConstants.NM_PORT_ENV); String nodePortString = System.getenv(Environment.NM_PORT.name());
String nodeHttpPortString = String nodeHttpPortString =
System.getenv(ApplicationConstants.NM_HTTP_PORT_ENV); System.getenv(Environment.NM_HTTP_PORT.name());
String appSubmitTimeStr = String appSubmitTimeStr =
System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV); System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
String maxAppAttempts = String maxAppAttempts =
System.getenv(ApplicationConstants.MAX_APP_ATTEMPTS_ENV); System.getenv(ApplicationConstants.MAX_APP_ATTEMPTS_ENV);
validateInputParam(containerIdStr, validateInputParam(containerIdStr,
ApplicationConstants.AM_CONTAINER_ID_ENV); Environment.CONTAINER_ID.name());
validateInputParam(nodeHostString, ApplicationConstants.NM_HOST_ENV); validateInputParam(nodeHostString, Environment.NM_HOST.name());
validateInputParam(nodePortString, ApplicationConstants.NM_PORT_ENV); validateInputParam(nodePortString, Environment.NM_PORT.name());
validateInputParam(nodeHttpPortString, validateInputParam(nodeHttpPortString,
ApplicationConstants.NM_HTTP_PORT_ENV); Environment.NM_HTTP_PORT.name());
validateInputParam(appSubmitTimeStr, validateInputParam(appSubmitTimeStr,
ApplicationConstants.APP_SUBMIT_TIME_ENV); ApplicationConstants.APP_SUBMIT_TIME_ENV);
validateInputParam(maxAppAttempts, validateInputParam(maxAppAttempts,

View File

@ -569,8 +569,6 @@ public interface MRJobConfig {
public static final String STDOUT_LOGFILE_ENV = "STDOUT_LOGFILE_ENV"; public static final String STDOUT_LOGFILE_ENV = "STDOUT_LOGFILE_ENV";
public static final String STDERR_LOGFILE_ENV = "STDERR_LOGFILE_ENV"; public static final String STDERR_LOGFILE_ENV = "STDERR_LOGFILE_ENV";
public static final String APPLICATION_ATTEMPT_ID_ENV = "APPLICATION_ATTEMPT_ID_ENV";
// This should be the directory where splits file gets localized on the node // This should be the directory where splits file gets localized on the node
// running ApplicationMaster. // running ApplicationMaster.
public static final String JOB_SUBMIT_DIR = "jobSubmitDir"; public static final String JOB_SUBMIT_DIR = "jobSubmitDir";

View File

@ -95,6 +95,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-441. Removed unused utility methods for collections from two API YARN-441. Removed unused utility methods for collections from two API
records. (Xuan Gong via vinodkv) records. (Xuan Gong via vinodkv)
YARN-561. Modified NodeManager to set key information into the environment
of every container that it launches. (Xuan Gong via vinodkv)
NEW FEATURES NEW FEATURES
YARN-482. FS: Extend SchedulingMode to intermediate queues. YARN-482. FS: Extend SchedulingMode to intermediate queues.
@ -167,6 +170,13 @@ Release 2.0.5-beta - UNRELEASED
YARN-542. Changed the default global AM max-attempts value to be not one. YARN-542. Changed the default global AM max-attempts value to be not one.
(Zhijie Shen via vinodkv) (Zhijie Shen via vinodkv)
YARN-583. Moved application level local resources to be localized under the
filecache sub-directory under application directory. (Omkar Vinit Joshi via
vinodkv)
YARN-581. Added a test to verify that app delegation tokens are restored
after RM restart. (Jian He via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES
@ -277,6 +287,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-594. Update test and add comments in YARN-534 (Jian He via bikas) YARN-594. Update test and add comments in YARN-534 (Jian He via bikas)
YARN-549. YarnClient.submitApplication should wait for application to be
accepted by the RM (Zhijie Shen via bikas)
Release 2.0.4-alpha - UNRELEASED Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -37,30 +37,6 @@ public interface ApplicationConstants {
public static final String APPLICATION_CLIENT_SECRET_ENV_NAME = public static final String APPLICATION_CLIENT_SECRET_ENV_NAME =
"AppClientSecretEnv"; "AppClientSecretEnv";
/**
* The environment variable for CONTAINER_ID. Set in AppMaster environment
* only
*/
public static final String AM_CONTAINER_ID_ENV = "AM_CONTAINER_ID";
/**
* The environment variable for the NM_HOST. Set in the AppMaster environment
* only
*/
public static final String NM_HOST_ENV = "NM_HOST";
/**
* The environment variable for the NM_PORT. Set in the AppMaster environment
* only
*/
public static final String NM_PORT_ENV = "NM_PORT";
/**
* The environment variable for the NM_HTTP_PORT. Set in the AppMaster environment
* only
*/
public static final String NM_HTTP_PORT_ENV = "NM_HTTP_PORT";
/** /**
* The environment variable for APP_SUBMIT_TIME. Set in AppMaster environment * The environment variable for APP_SUBMIT_TIME. Set in AppMaster environment
* only * only
@ -70,8 +46,6 @@ public interface ApplicationConstants {
public static final String CONTAINER_TOKEN_FILE_ENV_NAME = public static final String CONTAINER_TOKEN_FILE_ENV_NAME =
UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION; UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
public static final String LOCAL_DIR_ENV = "YARN_LOCAL_DIRS";
/** /**
* The environmental variable for APPLICATION_WEB_PROXY_BASE. Set in * The environmental variable for APPLICATION_WEB_PROXY_BASE. Set in
* ApplicationMaster's environment only. This states that for all non-relative * ApplicationMaster's environment only. This states that for all non-relative
@ -177,7 +151,37 @@ public interface ApplicationConstants {
/** /**
* $HADOOP_YARN_HOME * $HADOOP_YARN_HOME
*/ */
HADOOP_YARN_HOME("HADOOP_YARN_HOME"); HADOOP_YARN_HOME("HADOOP_YARN_HOME"),
/**
* $CONTAINER_ID
* Final, exported by NodeManager and non-modifiable by users.
*/
CONTAINER_ID("CONTAINER_ID"),
/**
* $NM_HOST
* Final, exported by NodeManager and non-modifiable by users.
*/
NM_HOST("NM_HOST"),
/**
* $NM_HTTP_PORT
* Final, exported by NodeManager and non-modifiable by users.
*/
NM_HTTP_PORT("NM_HTTP_PORT"),
/**
* $NM_PORT
* Final, exported by NodeManager and non-modifiable by users.
*/
NM_PORT("NM_PORT"),
/**
* $LOCAL_DIRS
* Final, exported by NodeManager and non-modifiable by users.
*/
LOCAL_DIRS("LOCAL_DIRS");
private final String variable; private final String variable;
private Environment(String variable) { private Environment(String variable) {

View File

@ -96,7 +96,9 @@ public interface ClientRMProtocol {
* *
* <p>Currently the <code>ResourceManager</code> sends an immediate (empty) * <p>Currently the <code>ResourceManager</code> sends an immediate (empty)
* {@link SubmitApplicationResponse} on accepting the submission and throws * {@link SubmitApplicationResponse} on accepting the submission and throws
* an exception if it rejects the submission.</p> * an exception if it rejects the submission. However, this call needs to be
* followed by {@link #getApplicationReport(GetApplicationReportRequest)}
* to make sure that the application gets properly submitted.</p>
* *
* <p> In secure mode,the <code>ResourceManager</code> verifies access to * <p> In secure mode,the <code>ResourceManager</code> verifies access to
* queues etc. before accepting the application submission.</p> * queues etc. before accepting the application submission.</p>

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.AMRMProtocol; import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.ContainerExitStatus; import org.apache.hadoop.yarn.api.ContainerExitStatus;
import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.ContainerManager;
@ -320,7 +321,7 @@ public class ApplicationMaster {
Map<String, String> envs = System.getenv(); Map<String, String> envs = System.getenv();
if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) { if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
if (cliParser.hasOption("app_attempt_id")) { if (cliParser.hasOption("app_attempt_id")) {
String appIdStr = cliParser.getOptionValue("app_attempt_id", ""); String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr); appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
@ -330,7 +331,7 @@ public class ApplicationMaster {
} }
} else { } else {
ContainerId containerId = ConverterUtils.toContainerId(envs ContainerId containerId = ConverterUtils.toContainerId(envs
.get(ApplicationConstants.AM_CONTAINER_ID_ENV)); .get(Environment.CONTAINER_ID.name()));
appAttemptID = containerId.getApplicationAttemptId(); appAttemptID = containerId.getApplicationAttemptId();
} }
@ -338,16 +339,16 @@ public class ApplicationMaster {
throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV
+ " not set in the environment"); + " not set in the environment");
} }
if (!envs.containsKey(ApplicationConstants.NM_HOST_ENV)) { if (!envs.containsKey(Environment.NM_HOST.name())) {
throw new RuntimeException(ApplicationConstants.NM_HOST_ENV throw new RuntimeException(Environment.NM_HOST.name()
+ " not set in the environment"); + " not set in the environment");
} }
if (!envs.containsKey(ApplicationConstants.NM_HTTP_PORT_ENV)) { if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) {
throw new RuntimeException(ApplicationConstants.NM_HTTP_PORT_ENV throw new RuntimeException(Environment.NM_HTTP_PORT
+ " not set in the environment"); + " not set in the environment");
} }
if (!envs.containsKey(ApplicationConstants.NM_PORT_ENV)) { if (!envs.containsKey(Environment.NM_PORT.name())) {
throw new RuntimeException(ApplicationConstants.NM_PORT_ENV throw new RuntimeException(Environment.NM_PORT.name()
+ " not set in the environment"); + " not set in the environment");
} }

View File

@ -37,6 +37,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -58,7 +59,7 @@ import org.apache.hadoop.yarn.util.Records;
* creates a new application on the RM and negotiates a new attempt id. Then it * creates a new application on the RM and negotiates a new attempt id. Then it
* waits for the RM app state to reach be YarnApplicationState.ACCEPTED after * waits for the RM app state to reach be YarnApplicationState.ACCEPTED after
* which it spawns the AM in another process and passes it the container id via * which it spawns the AM in another process and passes it the container id via
* env variable ApplicationConstants.AM_CONTAINER_ID_ENV. The AM can be in any * env variable Environment.CONTAINER_ID. The AM can be in any
* language. The AM can register with the RM using the attempt id obtained * language. The AM can register with the RM using the attempt id obtained
* from the container id and proceed as normal. * from the container id and proceed as normal.
* The client redirects app stdout and stderr to its own stdout and * The client redirects app stdout and stderr to its own stdout and
@ -190,10 +191,11 @@ public class UnmanagedAMLauncher {
containerId.setId(0); containerId.setId(0);
String hostname = InetAddress.getLocalHost().getHostName(); String hostname = InetAddress.getLocalHost().getHostName();
envAMList.add(ApplicationConstants.AM_CONTAINER_ID_ENV + "=" + containerId); envAMList.add(Environment.CONTAINER_ID.name() + "=" + containerId);
envAMList.add(ApplicationConstants.NM_HOST_ENV + "=" + hostname); envAMList.add(Environment.NM_HOST.name() + "=" + hostname);
envAMList.add(ApplicationConstants.NM_HTTP_PORT_ENV + "=0"); envAMList.add(Environment.NM_HTTP_PORT.name() + "=0");
envAMList.add(ApplicationConstants.NM_PORT_ENV + "=0"); envAMList.add(Environment.NM_PORT.name() + "=0");
envAMList.add(Environment.LOCAL_DIRS.name() + "= /tmp");
envAMList.add(ApplicationConstants.APP_SUBMIT_TIME_ENV + "=" envAMList.add(ApplicationConstants.APP_SUBMIT_TIME_ENV + "="
+ System.currentTimeMillis()); + System.currentTimeMillis());

View File

@ -63,7 +63,9 @@ public interface YarnClient extends Service {
/** /**
* <p> * <p>
* Submit a new application to <code>YARN.</code> * Submit a new application to <code>YARN.</code> It is a blocking call, such
* that it will not return {@link ApplicationId} until the submitted
* application has been submitted and accepted by the ResourceManager.
* </p> * </p>
* *
* @param appContext * @param appContext

View File

@ -53,9 +53,11 @@ import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
@ -68,6 +70,7 @@ public class YarnClientImpl extends AbstractService implements YarnClient {
protected ClientRMProtocol rmClient; protected ClientRMProtocol rmClient;
protected InetSocketAddress rmAddress; protected InetSocketAddress rmAddress;
protected long statePollIntervalMillis;
private static final String ROOT = "root"; private static final String ROOT = "root";
@ -90,6 +93,9 @@ public class YarnClientImpl extends AbstractService implements YarnClient {
if (this.rmAddress == null) { if (this.rmAddress == null) {
this.rmAddress = getRmAddress(conf); this.rmAddress = getRmAddress(conf);
} }
statePollIntervalMillis = conf.getLong(
YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS,
YarnConfiguration.DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS);
super.init(conf); super.init(conf);
} }
@ -131,6 +137,29 @@ public class YarnClientImpl extends AbstractService implements YarnClient {
Records.newRecord(SubmitApplicationRequest.class); Records.newRecord(SubmitApplicationRequest.class);
request.setApplicationSubmissionContext(appContext); request.setApplicationSubmissionContext(appContext);
rmClient.submitApplication(request); rmClient.submitApplication(request);
int pollCount = 0;
while (true) {
YarnApplicationState state =
getApplicationReport(applicationId).getYarnApplicationState();
if (!state.equals(YarnApplicationState.NEW) &&
!state.equals(YarnApplicationState.NEW_SAVING)) {
break;
}
// Notify the client through the log every 10 poll, in case the client
// is blocked here too long.
if (++pollCount % 10 == 0) {
LOG.info("Application submission is not finished, " +
"submitted application " + applicationId +
" is still in " + state);
}
try {
Thread.sleep(statePollIntervalMillis);
} catch (InterruptedException ie) {
}
}
LOG.info("Submitted application " + applicationId + " to ResourceManager" LOG.info("Submitted application " + applicationId + " to ResourceManager"
+ " at " + rmAddress); + " at " + rmAddress);
return applicationId; return applicationId;

View File

@ -18,10 +18,25 @@
package org.apache.hadoop.yarn.client; package org.apache.hadoop.yarn.client;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.client.YarnClient; import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.client.YarnClientImpl; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Test; import org.junit.Test;
public class TestYarnClient { public class TestYarnClient {
@ -43,4 +58,76 @@ public class TestYarnClient {
client.start(); client.start();
client.stop(); client.stop();
} }
@Test (timeout = 30000)
public void testSubmitApplication() {
Configuration conf = new Configuration();
conf.setLong(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS,
100); // speed up tests
final YarnClient client = new MockYarnClient();
client.init(conf);
client.start();
YarnApplicationState[] exitStates = new YarnApplicationState[]
{
YarnApplicationState.SUBMITTED,
YarnApplicationState.ACCEPTED,
YarnApplicationState.RUNNING,
YarnApplicationState.FINISHED,
YarnApplicationState.FAILED,
YarnApplicationState.KILLED
};
for (int i = 0; i < exitStates.length; ++i) {
ApplicationSubmissionContext context =
mock(ApplicationSubmissionContext.class);
ApplicationId applicationId = Records.newRecord(ApplicationId.class);
applicationId.setClusterTimestamp(System.currentTimeMillis());
applicationId.setId(i);
when(context.getApplicationId()).thenReturn(applicationId);
((MockYarnClient) client).setYarnApplicationState(exitStates[i]);
try {
client.submitApplication(context);
} catch (YarnRemoteException e) {
Assert.fail("Exception is not expected.");
}
verify(((MockYarnClient) client).mockReport,times(4 * i + 4))
.getYarnApplicationState();
}
client.stop();
}
private static class MockYarnClient extends YarnClientImpl {
private ApplicationReport mockReport;
public MockYarnClient() {
super();
}
@Override
public void start() {
rmClient = mock(ClientRMProtocol.class);
GetApplicationReportResponse mockResponse =
mock(GetApplicationReportResponse.class);
mockReport = mock(ApplicationReport.class);
try{
when(rmClient.getApplicationReport(any(
GetApplicationReportRequest.class))).thenReturn(mockResponse);
} catch (YarnRemoteException e) {
Assert.fail("Exception is not expected.");
}
when(mockResponse.getApplicationReport()).thenReturn(mockReport);
}
@Override
public void stop() {
}
public void setYarnApplicationState(YarnApplicationState state) {
when(mockReport.getYarnApplicationState()).thenReturn(
YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING,
YarnApplicationState.NEW_SAVING, state);
}
}
} }

View File

@ -692,6 +692,19 @@ public class YarnConfiguration extends Configuration {
*/ */
public static boolean DEFAULT_YARN_MINICLUSTER_FIXED_PORTS = false; public static boolean DEFAULT_YARN_MINICLUSTER_FIXED_PORTS = false;
////////////////////////////////
// Other Configs
////////////////////////////////
/**
* The interval of the yarn client's querying application state after
* application submission. The unit is millisecond.
*/
public static final String YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS =
YARN_PREFIX + "client.app-submission.poll-interval";
public static final long DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS =
1000;
public YarnConfiguration() { public YarnConfiguration() {
super(); super();
} }

View File

@ -708,4 +708,12 @@
<value>$HADOOP_CONF_DIR,$HADOOP_COMMON_HOME/share/hadoop/common/*,$HADOOP_COMMON_HOME/share/hadoop/common/lib/*,$HADOOP_HDFS_HOME/share/hadoop/hdfs/*,$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*,$HADOOP_YARN_HOME/share/hadoop/yarn/*,$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*</value> <value>$HADOOP_CONF_DIR,$HADOOP_COMMON_HOME/share/hadoop/common/*,$HADOOP_COMMON_HOME/share/hadoop/common/lib/*,$HADOOP_HDFS_HOME/share/hadoop/hdfs/*,$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*,$HADOOP_YARN_HOME/share/hadoop/yarn/*,$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*</value>
</property> </property>
<!-- Other configuration -->
<property>
<description>The interval of the yarn client's querying application state
after application submission. The unit is millisecond.</description>
<name>yarn.client.app-submission.poll-interval</name>
<value>1000</value>
</property>
</configuration> </configuration>

View File

@ -113,13 +113,13 @@ public class DefaultContainerExecutor extends ContainerExecutor {
List<String> localDirs, List<String> logDirs) throws IOException { List<String> localDirs, List<String> logDirs) throws IOException {
FsPermission dirPerm = new FsPermission(APPDIR_PERM); FsPermission dirPerm = new FsPermission(APPDIR_PERM);
ContainerId containerId = container.getContainerID(); ContainerId containerId = container.getContainer().getId();
// create container dirs on all disks // create container dirs on all disks
String containerIdStr = ConverterUtils.toString(containerId); String containerIdStr = ConverterUtils.toString(containerId);
String appIdStr = String appIdStr =
ConverterUtils.toString( ConverterUtils.toString(
container.getContainerID().getApplicationAttemptId(). containerId.getApplicationAttemptId().
getApplicationId()); getApplicationId());
for (String sLocalDir : localDirs) { for (String sLocalDir : localDirs) {
Path usersdir = new Path(sLocalDir, ContainerLocalizer.USERCACHE); Path usersdir = new Path(sLocalDir, ContainerLocalizer.USERCACHE);

View File

@ -216,11 +216,11 @@ public class LinuxContainerExecutor extends ContainerExecutor {
String user, String appId, Path containerWorkDir, String user, String appId, Path containerWorkDir,
List<String> localDirs, List<String> logDirs) throws IOException { List<String> localDirs, List<String> logDirs) throws IOException {
ContainerId containerId = container.getContainerID(); ContainerId containerId = container.getContainer().getId();
String containerIdStr = ConverterUtils.toString(containerId); String containerIdStr = ConverterUtils.toString(containerId);
resourcesHandler.preExecute(containerId, resourcesHandler.preExecute(containerId,
container.getResource()); container.getContainer().getResource());
String resourcesOptions = resourcesHandler.getResourcesOption( String resourcesOptions = resourcesHandler.getResourcesOption(
containerId); containerId);

View File

@ -36,8 +36,8 @@ public class ApplicationContainerInitEvent extends ApplicationEvent {
final Container container; final Container container;
public ApplicationContainerInitEvent(Container container) { public ApplicationContainerInitEvent(Container container) {
super(container.getContainerID().getApplicationAttemptId().getApplicationId(), super(container.getContainer().getId().getApplicationAttemptId()
ApplicationEventType.INIT_CONTAINER); .getApplicationId(), ApplicationEventType.INIT_CONTAINER);
this.container = container; this.container = container;
} }

View File

@ -274,14 +274,14 @@ public class ApplicationImpl implements Application {
ApplicationContainerInitEvent initEvent = ApplicationContainerInitEvent initEvent =
(ApplicationContainerInitEvent) event; (ApplicationContainerInitEvent) event;
Container container = initEvent.getContainer(); Container container = initEvent.getContainer();
app.containers.put(container.getContainerID(), container); app.containers.put(container.getContainer().getId(), container);
LOG.info("Adding " + container.getContainerID() LOG.info("Adding " + container.getContainer().getId()
+ " to application " + app.toString()); + " to application " + app.toString());
switch (app.getApplicationState()) { switch (app.getApplicationState()) {
case RUNNING: case RUNNING:
app.dispatcher.getEventHandler().handle(new ContainerInitEvent( app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
container.getContainerID())); container.getContainer().getId()));
break; break;
case INITING: case INITING:
case NEW: case NEW:
@ -302,7 +302,7 @@ public class ApplicationImpl implements Application {
// Start all the containers waiting for ApplicationInit // Start all the containers waiting for ApplicationInit
for (Container container : app.containers.values()) { for (Container container : app.containers.values()) {
app.dispatcher.getEventHandler().handle(new ContainerInitEvent( app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
container.getContainerID())); container.getContainer().getId()));
} }
} }
} }

View File

@ -25,12 +25,11 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
public interface Container extends EventHandler<ContainerEvent> { public interface Container extends EventHandler<ContainerEvent> {
org.apache.hadoop.yarn.api.records.ContainerId getContainerID(); org.apache.hadoop.yarn.api.records.Container getContainer();
String getUser(); String getUser();
@ -46,5 +45,4 @@ public interface Container extends EventHandler<ContainerEvent> {
String toString(); String toString();
Resource getResource();
} }

View File

@ -42,7 +42,6 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
@ -312,16 +311,6 @@ public class ContainerImpl implements Container {
} }
} }
@Override
public ContainerId getContainerID() {
this.readLock.lock();
try {
return this.container.getId();
} finally {
this.readLock.unlock();
}
}
@Override @Override
public String getUser() { public String getUser() {
this.readLock.lock(); this.readLock.lock();
@ -385,10 +374,10 @@ public class ContainerImpl implements Container {
} }
@Override @Override
public Resource getResource() { public org.apache.hadoop.yarn.api.records.Container getContainer() {
this.readLock.lock(); this.readLock.lock();
try { try {
return this.container.getResource(); return this.container;
} finally { } finally {
this.readLock.unlock(); this.readLock.unlock();
} }

View File

@ -118,7 +118,7 @@ public class ContainerLaunch implements Callable<Integer> {
final ContainerLaunchContext launchContext = container.getLaunchContext(); final ContainerLaunchContext launchContext = container.getLaunchContext();
final Map<Path,List<String>> localResources = final Map<Path,List<String>> localResources =
container.getLocalizedResources(); container.getLocalizedResources();
ContainerId containerID = container.getContainerID(); ContainerId containerID = container.getContainer().getId();
String containerIdStr = ConverterUtils.toString(containerID); String containerIdStr = ConverterUtils.toString(containerID);
final String user = launchContext.getUser(); final String user = launchContext.getUser();
final List<String> command = launchContext.getCommands(); final List<String> command = launchContext.getCommands();
@ -299,7 +299,7 @@ public class ContainerLaunch implements Callable<Integer> {
* @throws IOException * @throws IOException
*/ */
public void cleanupContainer() throws IOException { public void cleanupContainer() throws IOException {
ContainerId containerId = container.getContainerID(); ContainerId containerId = container.getContainer().getId();
String containerIdStr = ConverterUtils.toString(containerId); String containerIdStr = ConverterUtils.toString(containerId);
LOG.info("Cleaning up container " + containerIdStr); LOG.info("Cleaning up container " + containerIdStr);
@ -370,7 +370,7 @@ public class ContainerLaunch implements Callable<Integer> {
*/ */
private String getContainerPid(Path pidFilePath) throws Exception { private String getContainerPid(Path pidFilePath) throws Exception {
String containerIdStr = String containerIdStr =
ConverterUtils.toString(container.getContainerID()); ConverterUtils.toString(container.getContainer().getId());
String processId = null; String processId = null;
LOG.debug("Accessing pid for container " + containerIdStr LOG.debug("Accessing pid for container " + containerIdStr
+ " from pid file " + pidFilePath); + " from pid file " + pidFilePath);
@ -547,6 +547,21 @@ public class ContainerLaunch implements Callable<Integer> {
* Non-modifiable environment variables * Non-modifiable environment variables
*/ */
environment.put(Environment.CONTAINER_ID.name(), container
.getContainer().getId().toString());
environment.put(Environment.NM_PORT.name(),
String.valueOf(container.getContainer().getNodeId().getPort()));
environment.put(Environment.NM_HOST.name(), container.getContainer()
.getNodeId().getHost());
environment.put(Environment.NM_HTTP_PORT.name(), container.getContainer()
.getNodeHttpAddress().split(":")[1]);
environment.put(Environment.LOCAL_DIRS.name(),
StringUtils.join(",", appDirs));
putEnvIfNotNull(environment, Environment.USER.name(), container.getUser()); putEnvIfNotNull(environment, Environment.USER.name(), container.getUser());
putEnvIfNotNull(environment, putEnvIfNotNull(environment,
@ -566,11 +581,6 @@ public class ContainerLaunch implements Callable<Integer> {
Environment.HADOOP_CONF_DIR.name(), Environment.HADOOP_CONF_DIR.name(),
System.getenv(Environment.HADOOP_CONF_DIR.name()) System.getenv(Environment.HADOOP_CONF_DIR.name())
); );
putEnvIfNotNull(environment,
ApplicationConstants.LOCAL_DIR_ENV,
StringUtils.join(",", appDirs)
);
if (!Shell.WINDOWS) { if (!Shell.WINDOWS) {
environment.put("JVM_PID", "$$"); environment.put("JVM_PID", "$$");

View File

@ -111,7 +111,7 @@ public class ContainersLauncher extends AbstractService
public void handle(ContainersLauncherEvent event) { public void handle(ContainersLauncherEvent event) {
// TODO: ContainersLauncher launches containers one by one!! // TODO: ContainersLauncher launches containers one by one!!
Container container = event.getContainer(); Container container = event.getContainer();
ContainerId containerId = container.getContainerID(); ContainerId containerId = container.getContainer().getId();
switch (event.getType()) { switch (event.getType()) {
case LAUNCH_CONTAINER: case LAUNCH_CONTAINER:
Application app = Application app =

View File

@ -27,6 +27,7 @@ import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
@ -46,7 +47,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -65,6 +65,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
@ -358,13 +359,15 @@ public class ResourceLocalizationService extends CompositeService
ContainerLocalizationRequestEvent rsrcReqs) { ContainerLocalizationRequestEvent rsrcReqs) {
Container c = rsrcReqs.getContainer(); Container c = rsrcReqs.getContainer();
LocalizerContext ctxt = new LocalizerContext( LocalizerContext ctxt = new LocalizerContext(
c.getUser(), c.getContainerID(), c.getCredentials()); c.getUser(), c.getContainer().getId(), c.getCredentials());
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs = Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
rsrcReqs.getRequestedResources(); rsrcReqs.getRequestedResources();
for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e : for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
rsrcs.entrySet()) { rsrcs.entrySet()) {
LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(), LocalResourcesTracker tracker =
c.getContainerID().getApplicationAttemptId().getApplicationId()); getLocalResourcesTracker(e.getKey(), c.getUser(),
c.getContainer().getId().getApplicationAttemptId()
.getApplicationId());
for (LocalResourceRequest req : e.getValue()) { for (LocalResourceRequest req : e.getValue()) {
tracker.handle(new ResourceRequestEvent(req, e.getKey(), ctxt)); tracker.handle(new ResourceRequestEvent(req, e.getKey(), ctxt));
} }
@ -393,19 +396,21 @@ public class ResourceLocalizationService extends CompositeService
for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e : for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
rsrcs.entrySet()) { rsrcs.entrySet()) {
LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(), LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
c.getContainerID().getApplicationAttemptId().getApplicationId()); c.getContainer().getId().getApplicationAttemptId()
.getApplicationId());
for (LocalResourceRequest req : e.getValue()) { for (LocalResourceRequest req : e.getValue()) {
tracker.handle(new ResourceReleaseEvent(req, c.getContainerID())); tracker.handle(new ResourceReleaseEvent(req,
c.getContainer().getId()));
} }
} }
String locId = ConverterUtils.toString(c.getContainerID()); String locId = ConverterUtils.toString(c.getContainer().getId());
localizerTracker.cleanupPrivLocalizers(locId); localizerTracker.cleanupPrivLocalizers(locId);
// Delete the container directories // Delete the container directories
String userName = c.getUser(); String userName = c.getUser();
String containerIDStr = c.toString(); String containerIDStr = c.toString();
String appIDStr = ConverterUtils.toString( String appIDStr = ConverterUtils.toString(
c.getContainerID().getApplicationAttemptId().getApplicationId()); c.getContainer().getId().getApplicationAttemptId().getApplicationId());
for (String localDir : dirsHandler.getLocalDirs()) { for (String localDir : dirsHandler.getLocalDirs()) {
// Delete the user-owned container-dir // Delete the user-owned container-dir
@ -424,8 +429,9 @@ public class ResourceLocalizationService extends CompositeService
delService.delete(null, containerSysDir, new Path[] {}); delService.delete(null, containerSysDir, new Path[] {});
} }
dispatcher.getEventHandler().handle(new ContainerEvent(c.getContainerID(), dispatcher.getEventHandler().handle(
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP)); new ContainerEvent(c.getContainer().getId(),
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
} }
@ -481,18 +487,15 @@ public class ResourceLocalizationService extends CompositeService
} }
private String getUserFileCachePath(String user) { private String getUserFileCachePath(String user) {
String path = return StringUtils.join(Path.SEPARATOR, Arrays.asList(".",
"." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR ContainerLocalizer.USERCACHE, user, ContainerLocalizer.FILECACHE));
+ user + Path.SEPARATOR + ContainerLocalizer.FILECACHE;
return path;
} }
private String getUserAppCachePath(String user, String appId) { private String getAppFileCachePath(String user, String appId) {
String path = return StringUtils.join(Path.SEPARATOR, Arrays.asList(".",
"." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR ContainerLocalizer.USERCACHE, user, ContainerLocalizer.APPCACHE, appId,
+ user + Path.SEPARATOR + ContainerLocalizer.APPCACHE ContainerLocalizer.FILECACHE));
+ Path.SEPARATOR + appId;
return path;
} }
@VisibleForTesting @VisibleForTesting
@ -942,7 +945,7 @@ public class ResourceLocalizationService extends CompositeService
if (vis == LocalResourceVisibility.PRIVATE) {// PRIVATE Only if (vis == LocalResourceVisibility.PRIVATE) {// PRIVATE Only
cacheDirectory = getUserFileCachePath(user); cacheDirectory = getUserFileCachePath(user);
} else {// APPLICATION ONLY } else {// APPLICATION ONLY
cacheDirectory = getUserAppCachePath(user, appId.toString()); cacheDirectory = getAppFileCachePath(user, appId.toString());
} }
Path dirPath = Path dirPath =
dirsHandler.getLocalPathForWrite(cacheDirectory, dirsHandler.getLocalPathForWrite(cacheDirectory,

View File

@ -60,7 +60,7 @@ public class ContainerInfo {
public ContainerInfo(final Context nmContext, final Container container, public ContainerInfo(final Context nmContext, final Container container,
String requestUri, String pathPrefix) { String requestUri, String pathPrefix) {
this.id = container.getContainerID().toString(); this.id = container.getContainer().getId().toString();
this.nodeId = nmContext.getNodeId().toString(); this.nodeId = nmContext.getNodeId().toString();
ContainerStatus containerData = container.cloneAndGetContainerStatus(); ContainerStatus containerData = container.cloneAndGetContainerStatus();
this.exitCode = containerData.getExitStatus(); this.exitCode = containerData.getExitStatus();
@ -74,7 +74,7 @@ public class ContainerInfo {
} }
this.user = container.getUser(); this.user = container.getUser();
Resource res = container.getResource(); Resource res = container.getContainer().getResource();
if (res != null) { if (res != null) {
this.totalMemoryNeededMB = res.getMemory(); this.totalMemoryNeededMB = res.getMemory();
} }

View File

@ -88,10 +88,10 @@ public class DummyContainerManager extends ContainerManagerImpl {
.getRequestedResources().values()) { .getRequestedResources().values()) {
for (LocalResourceRequest req : rc) { for (LocalResourceRequest req : rc) {
LOG.info("DEBUG: " + req + ":" LOG.info("DEBUG: " + req + ":"
+ rsrcReqs.getContainer().getContainerID()); + rsrcReqs.getContainer().getContainer().getId());
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
new ContainerResourceLocalizedEvent(rsrcReqs.getContainer() new ContainerResourceLocalizedEvent(rsrcReqs.getContainer()
.getContainerID(), req, new Path("file:///local" .getContainer().getId(), req, new Path("file:///local"
+ req.getPath().toUri().getPath()))); + req.getPath().toUri().getPath())));
} }
} }
@ -101,7 +101,7 @@ public class DummyContainerManager extends ContainerManagerImpl {
((ContainerLocalizationEvent) event).getContainer(); ((ContainerLocalizationEvent) event).getContainer();
// TODO: delete the container dir // TODO: delete the container dir
this.dispatcher.getEventHandler().handle( this.dispatcher.getEventHandler().handle(
new ContainerEvent(container.getContainerID(), new ContainerEvent(container.getContainer().getId(),
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP)); ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
break; break;
case DESTROY_APPLICATION_RESOURCES: case DESTROY_APPLICATION_RESOURCES:
@ -130,7 +130,7 @@ public class DummyContainerManager extends ContainerManagerImpl {
@Override @Override
public void handle(ContainersLauncherEvent event) { public void handle(ContainersLauncherEvent event) {
Container container = event.getContainer(); Container container = event.getContainer();
ContainerId containerId = container.getContainerID(); ContainerId containerId = container.getContainer().getId();
switch (event.getType()) { switch (event.getType()) {
case LAUNCH_CONTAINER: case LAUNCH_CONTAINER:
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(

View File

@ -186,7 +186,10 @@ public class TestLinuxContainerExecutor {
ContainerLaunchContext context = mock(ContainerLaunchContext.class); ContainerLaunchContext context = mock(ContainerLaunchContext.class);
HashMap<String, String> env = new HashMap<String,String>(); HashMap<String, String> env = new HashMap<String,String>();
when(container.getContainerID()).thenReturn(cId); org.apache.hadoop.yarn.api.records.Container containerAPI =
mock(org.apache.hadoop.yarn.api.records.Container.class);
when(container.getContainer()).thenReturn(containerAPI);
when(container.getContainer().getId()).thenReturn(cId);
when(container.getLaunchContext()).thenReturn(context); when(container.getLaunchContext()).thenReturn(context);
when(context.getEnvironment()).thenReturn(env); when(context.getEnvironment()).thenReturn(env);

View File

@ -107,7 +107,10 @@ public class TestLinuxContainerExecutorWithMocks {
ContainerLaunchContext context = mock(ContainerLaunchContext.class); ContainerLaunchContext context = mock(ContainerLaunchContext.class);
HashMap<String, String> env = new HashMap<String,String>(); HashMap<String, String> env = new HashMap<String,String>();
when(container.getContainerID()).thenReturn(cId); org.apache.hadoop.yarn.api.records.Container containerAPI =
mock(org.apache.hadoop.yarn.api.records.Container.class);
when(container.getContainer()).thenReturn(containerAPI);
when(container.getContainer().getId()).thenReturn(cId);
when(container.getLaunchContext()).thenReturn(context); when(container.getLaunchContext()).thenReturn(context);
when(cId.toString()).thenReturn(containerId); when(cId.toString()).thenReturn(containerId);
@ -225,7 +228,10 @@ public class TestLinuxContainerExecutorWithMocks {
ContainerLaunchContext context = mock(ContainerLaunchContext.class); ContainerLaunchContext context = mock(ContainerLaunchContext.class);
HashMap<String, String> env = new HashMap<String, String>(); HashMap<String, String> env = new HashMap<String, String>();
when(container.getContainerID()).thenReturn(cId); org.apache.hadoop.yarn.api.records.Container containerAPI =
mock(org.apache.hadoop.yarn.api.records.Container.class);
when(container.getContainer()).thenReturn(containerAPI);
when(container.getContainer().getId()).thenReturn(cId);
when(container.getLaunchContext()).thenReturn(context); when(container.getLaunchContext()).thenReturn(context);
when(cId.toString()).thenReturn(containerId); when(cId.toString()).thenReturn(containerId);

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -178,6 +179,9 @@ public class TestNodeManagerShutdown {
Container mockContainer = mock(Container.class); Container mockContainer = mock(Container.class);
when(mockContainer.getId()).thenReturn(cId); when(mockContainer.getId()).thenReturn(cId);
NodeId nodeId = BuilderUtils.newNodeId("localhost", 1234);
when(mockContainer.getNodeId()).thenReturn(nodeId);
when(mockContainer.getNodeHttpAddress()).thenReturn("localhost:12345");
containerLaunchContext.setUser(user); containerLaunchContext.setUser(user);
URL localResourceUri = URL localResourceUri =

View File

@ -150,6 +150,10 @@ public class TestContainerManager extends BaseContainerManagerTest {
when(mockContainer.getId()).thenReturn(cId); when(mockContainer.getId()).thenReturn(cId);
when(mockContainer.getResource()).thenReturn( when(mockContainer.getResource()).thenReturn(
BuilderUtils.newResource(512, 1)); BuilderUtils.newResource(512, 1));
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
when(mockContainer.getNodeHttpAddress()).thenReturn(
context.getNodeId().getHost() + ":12345");
StartContainerRequest startRequest = StartContainerRequest startRequest =
recordFactory.newRecordInstance(StartContainerRequest.class); recordFactory.newRecordInstance(StartContainerRequest.class);
@ -245,6 +249,10 @@ public class TestContainerManager extends BaseContainerManagerTest {
when(mockContainer.getId()).thenReturn(cId); when(mockContainer.getId()).thenReturn(cId);
when(mockContainer.getResource()).thenReturn( when(mockContainer.getResource()).thenReturn(
BuilderUtils.newResource(100, 1)); // MB BuilderUtils.newResource(100, 1)); // MB
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
when(mockContainer.getNodeHttpAddress()).thenReturn(
context.getNodeId().getHost() + ":12345");
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class); StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext); startRequest.setContainerLaunchContext(containerLaunchContext);
startRequest.setContainer(mockContainer); startRequest.setContainer(mockContainer);
@ -352,7 +360,9 @@ public class TestContainerManager extends BaseContainerManagerTest {
when(mockContainer.getId()).thenReturn(cId); when(mockContainer.getId()).thenReturn(cId);
when(mockContainer.getResource()).thenReturn( when(mockContainer.getResource()).thenReturn(
BuilderUtils.newResource(100, 1)); // MB BuilderUtils.newResource(100, 1)); // MB
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
when(mockContainer.getNodeHttpAddress()).thenReturn(
context.getNodeId().getHost() + ":12345");
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class); StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext); startRequest.setContainerLaunchContext(containerLaunchContext);
startRequest.setContainer(mockContainer); startRequest.setContainer(mockContainer);
@ -444,6 +454,9 @@ public class TestContainerManager extends BaseContainerManagerTest {
when(mockContainer.getId()).thenReturn(cId); when(mockContainer.getId()).thenReturn(cId);
when(mockContainer.getResource()).thenReturn( when(mockContainer.getResource()).thenReturn(
BuilderUtils.newResource(100, 1)); BuilderUtils.newResource(100, 1));
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
when(mockContainer.getNodeHttpAddress()).thenReturn(
context.getNodeId().getHost() + ":12345");
// containerLaunchContext.command = new ArrayList<CharSequence>(); // containerLaunchContext.command = new ArrayList<CharSequence>();

View File

@ -83,7 +83,7 @@ public class TestApplication {
for (int i = 0; i < wa.containers.size(); i++) { for (int i = 0; i < wa.containers.size(); i++) {
verify(wa.containerBus).handle( verify(wa.containerBus).handle(
argThat(new ContainerInitMatcher(wa.containers.get(i) argThat(new ContainerInitMatcher(wa.containers.get(i)
.getContainerID()))); .getContainer().getId())));
} }
} finally { } finally {
if (wa != null) if (wa != null)
@ -108,7 +108,7 @@ public class TestApplication {
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
verify(wa.containerBus).handle( verify(wa.containerBus).handle(
argThat(new ContainerInitMatcher(wa.containers.get(0) argThat(new ContainerInitMatcher(wa.containers.get(0)
.getContainerID()))); .getContainer().getId())));
wa.initContainer(1); wa.initContainer(1);
wa.initContainer(2); wa.initContainer(2);
@ -118,7 +118,7 @@ public class TestApplication {
for (int i = 1; i < wa.containers.size(); i++) { for (int i = 1; i < wa.containers.size(); i++) {
verify(wa.containerBus).handle( verify(wa.containerBus).handle(
argThat(new ContainerInitMatcher(wa.containers.get(i) argThat(new ContainerInitMatcher(wa.containers.get(i)
.getContainerID()))); .getContainer().getId())));
} }
} finally { } finally {
if (wa != null) if (wa != null)
@ -233,7 +233,7 @@ public class TestApplication {
for (int i = 1; i < wa.containers.size(); i++) { for (int i = 1; i < wa.containers.size(); i++) {
verify(wa.containerBus).handle( verify(wa.containerBus).handle(
argThat(new ContainerKillMatcher(wa.containers.get(i) argThat(new ContainerKillMatcher(wa.containers.get(i)
.getContainerID()))); .getContainer().getId())));
} }
wa.containerFinished(1); wa.containerFinished(1);
@ -354,7 +354,7 @@ public class TestApplication {
verify(wa.containerBus).handle( verify(wa.containerBus).handle(
argThat(new ContainerKillMatcher(wa.containers.get(0) argThat(new ContainerKillMatcher(wa.containers.get(0)
.getContainerID()))); .getContainer().getId())));
assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT, assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT,
wa.app.getApplicationState()); wa.app.getApplicationState());
@ -487,7 +487,7 @@ public class TestApplication {
public void containerFinished(int containerNum) { public void containerFinished(int containerNum) {
app.handle(new ApplicationContainerFinishedEvent(containers.get( app.handle(new ApplicationContainerFinishedEvent(containers.get(
containerNum).getContainerID())); containerNum).getContainer().getId()));
drainDispatcherEvents(); drainDispatcherEvents();
} }
@ -514,7 +514,10 @@ public class TestApplication {
BuilderUtils.newApplicationAttemptId(appId, 1); BuilderUtils.newApplicationAttemptId(appId, 1);
ContainerId cId = BuilderUtils.newContainerId(appAttemptId, containerId); ContainerId cId = BuilderUtils.newContainerId(appAttemptId, containerId);
Container c = mock(Container.class); Container c = mock(Container.class);
when(c.getContainerID()).thenReturn(cId); org.apache.hadoop.yarn.api.records.Container containerAPI =
mock(org.apache.hadoop.yarn.api.records.Container.class);
when(c.getContainer()).thenReturn(containerAPI);
when(c.getContainer().getId()).thenReturn(cId);
ContainerLaunchContext launchContext = mock(ContainerLaunchContext.class); ContainerLaunchContext launchContext = mock(ContainerLaunchContext.class);
when(c.getLaunchContext()).thenReturn(launchContext); when(c.getLaunchContext()).thenReturn(launchContext);
when(launchContext.getApplicationACLs()).thenReturn( when(launchContext.getApplicationACLs()).thenReturn(

View File

@ -376,7 +376,7 @@ public class TestContainer {
public boolean matches(Object o) { public boolean matches(Object o) {
ContainersLauncherEvent evt = (ContainersLauncherEvent) o; ContainersLauncherEvent evt = (ContainersLauncherEvent) o;
return evt.getType() == ContainersLauncherEventType.LAUNCH_CONTAINER return evt.getType() == ContainersLauncherEventType.LAUNCH_CONTAINER
&& wcf.cId == evt.getContainer().getContainerID(); && wcf.cId == evt.getContainer().getContainer().getId();
} }
}; };
verify(wc.launcherBus).handle(argThat(matchesLaunchReq)); verify(wc.launcherBus).handle(argThat(matchesLaunchReq));
@ -639,7 +639,7 @@ public class TestContainer {
Path p = new Path(cache, rsrc.getKey()); Path p = new Path(cache, rsrc.getKey());
localPaths.put(p, Arrays.asList(rsrc.getKey())); localPaths.put(p, Arrays.asList(rsrc.getKey()));
// rsrc copied to p // rsrc copied to p
c.handle(new ContainerResourceLocalizedEvent(c.getContainerID(), c.handle(new ContainerResourceLocalizedEvent(c.getContainer().getId(),
req, p)); req, p));
} }
drainDispatcherEvents(); drainDispatcherEvents();
@ -662,7 +662,8 @@ public class TestContainer {
LocalResource rsrc = localResources.get(rsrcKey); LocalResource rsrc = localResources.get(rsrcKey);
LocalResourceRequest req = new LocalResourceRequest(rsrc); LocalResourceRequest req = new LocalResourceRequest(rsrc);
Exception e = new Exception("Fake localization error"); Exception e = new Exception("Fake localization error");
c.handle(new ContainerResourceFailedEvent(c.getContainerID(), req, e)); c.handle(new ContainerResourceFailedEvent(c.getContainer()
.getId(), req, e));
drainDispatcherEvents(); drainDispatcherEvents();
} }
@ -677,7 +678,7 @@ public class TestContainer {
++counter; ++counter;
LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue()); LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue());
Exception e = new Exception("Fake localization error"); Exception e = new Exception("Fake localization error");
c.handle(new ContainerResourceFailedEvent(c.getContainerID(), c.handle(new ContainerResourceFailedEvent(c.getContainer().getId(),
req, e)); req, e));
} }
drainDispatcherEvents(); drainDispatcherEvents();

View File

@ -37,6 +37,7 @@ import java.util.Map;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@ -51,14 +52,13 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
@ -150,50 +150,17 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
} }
} }
} }
// this is a dirty hack - but should be ok for a unittest.
@SuppressWarnings({ "rawtypes", "unchecked" })
public static void setNewEnvironmentHack(Map<String, String> newenv) throws Exception {
try {
Class<?> cl = Class.forName("java.lang.ProcessEnvironment");
Field field = cl.getDeclaredField("theEnvironment");
field.setAccessible(true);
Map<String, String> env = (Map<String, String>)field.get(null);
env.clear();
env.putAll(newenv);
Field ciField = cl.getDeclaredField("theCaseInsensitiveEnvironment");
ciField.setAccessible(true);
Map<String, String> cienv = (Map<String, String>)ciField.get(null);
cienv.clear();
cienv.putAll(newenv);
} catch (NoSuchFieldException e) {
Class[] classes = Collections.class.getDeclaredClasses();
Map<String, String> env = System.getenv();
for (Class cl : classes) {
if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
Field field = cl.getDeclaredField("m");
field.setAccessible(true);
Object obj = field.get(env);
Map<String, String> map = (Map<String, String>) obj;
map.clear();
map.putAll(newenv);
}
}
}
}
/** /**
* See if environment variable is forwarded using sanitizeEnv. * See if environment variable is forwarded using sanitizeEnv.
* @throws Exception * @throws Exception
*/ */
@Test @Test (timeout = 5000)
public void testContainerEnvVariables() throws Exception { public void testContainerEnvVariables() throws Exception {
containerManager.start(); containerManager.start();
Map<String, String> envWithDummy = new HashMap<String, String>(); ContainerLaunchContext containerLaunchContext =
envWithDummy.putAll(System.getenv()); recordFactory.newRecordInstance(ContainerLaunchContext.class);
envWithDummy.put(Environment.MALLOC_ARENA_MAX.name(), "99");
setNewEnvironmentHack(envWithDummy);
Container mockContainer = mock(Container.class); Container mockContainer = mock(Container.class);
// ////// Construct the Container-id // ////// Construct the Container-id
@ -207,34 +174,54 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
ContainerId cId = ContainerId cId =
recordFactory.newRecordInstance(ContainerId.class); recordFactory.newRecordInstance(ContainerId.class);
cId.setApplicationAttemptId(appAttemptId); cId.setApplicationAttemptId(appAttemptId);
String malloc = System.getenv(Environment.MALLOC_ARENA_MAX.name()); when(mockContainer.getId()).thenReturn(cId);
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
when(mockContainer.getNodeHttpAddress()).thenReturn(
context.getNodeId().getHost() + ":12345");
Map<String, String> userSetEnv = new HashMap<String, String>();
userSetEnv.put(Environment.CONTAINER_ID.name(), "user_set_container_id");
userSetEnv.put(Environment.NM_HOST.name(), "user_set_NM_HOST");
userSetEnv.put(Environment.NM_PORT.name(), "user_set_NM_PORT");
userSetEnv.put(Environment.NM_HTTP_PORT.name(), "user_set_NM_HTTP_PORT");
userSetEnv.put(Environment.LOCAL_DIRS.name(), "user_set_LOCAL_DIR");
containerLaunchContext.setUser(user);
containerLaunchContext.setEnvironment(userSetEnv);
File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile"); File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
PrintWriter fileWriter = new PrintWriter(scriptFile); PrintWriter fileWriter = new PrintWriter(scriptFile);
File processStartFile = File processStartFile =
new File(tmpDir, "env_vars.txt").getAbsoluteFile(); new File(tmpDir, "env_vars.txt").getAbsoluteFile();
if (Shell.WINDOWS) { if (Shell.WINDOWS) {
fileWriter.println("@echo " + Environment.MALLOC_ARENA_MAX.$() + "> " + fileWriter.println("@echo " + Environment.CONTAINER_ID.$() + "> "
processStartFile); + processStartFile);
fileWriter.println("@echo " + cId + ">> " + processStartFile); fileWriter.println("@echo " + Environment.NM_HOST.$() + ">> "
+ processStartFile);
fileWriter.println("@echo " + Environment.NM_PORT.$() + ">> "
+ processStartFile);
fileWriter.println("@echo " + Environment.NM_HTTP_PORT.$() + ">> "
+ processStartFile);
fileWriter.println("@echo " + Environment.LOCAL_DIRS.$() + ">> "
+ processStartFile);
fileWriter.println("@ping -n 100 127.0.0.1 >nul"); fileWriter.println("@ping -n 100 127.0.0.1 >nul");
} else { } else {
fileWriter.write("\numask 0"); // So that start file is readable by the test fileWriter.write("\numask 0"); // So that start file is readable by the test
fileWriter.write("\necho " + Environment.MALLOC_ARENA_MAX.$() + " > " + fileWriter.write("\necho $" + Environment.CONTAINER_ID.name() + " > "
processStartFile); + processStartFile);
fileWriter.write("\necho $" + Environment.NM_HOST.name() + " >> "
+ processStartFile);
fileWriter.write("\necho $" + Environment.NM_PORT.name() + " >> "
+ processStartFile);
fileWriter.write("\necho $" + Environment.NM_HTTP_PORT.name() + " >> "
+ processStartFile);
fileWriter.write("\necho $" + Environment.LOCAL_DIRS.name() + " >> "
+ processStartFile);
fileWriter.write("\necho $$ >> " + processStartFile); fileWriter.write("\necho $$ >> " + processStartFile);
fileWriter.write("\nexec sleep 100"); fileWriter.write("\nexec sleep 100");
} }
fileWriter.close(); fileWriter.close();
assert(malloc != null && !"".equals(malloc));
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
when(mockContainer.getId()).thenReturn(cId);
containerLaunchContext.setUser(user);
// upload the script file so that the container can run it // upload the script file so that the container can run it
URL resource_alpha = URL resource_alpha =
ConverterUtils.getYarnUrlFromPath(localFS ConverterUtils.getYarnUrlFromPath(localFS
@ -272,9 +259,40 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
processStartFile.exists()); processStartFile.exists());
// Now verify the contents of the file // Now verify the contents of the file
List<String> localDirs = dirsHandler.getLocalDirs();
List<Path> appDirs = new ArrayList<Path>(localDirs.size());
for (String localDir : localDirs) {
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
Path userdir = new Path(usersdir, user);
Path appsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
appDirs.add(new Path(appsdir, appId.toString()));
}
BufferedReader reader = BufferedReader reader =
new BufferedReader(new FileReader(processStartFile)); new BufferedReader(new FileReader(processStartFile));
Assert.assertEquals(malloc, reader.readLine()); Assert.assertEquals(cId.toString(), reader.readLine());
Assert.assertEquals(mockContainer.getNodeId().getHost(),
reader.readLine());
Assert.assertEquals(String.valueOf(mockContainer.getNodeId().getPort()),
reader.readLine());
Assert.assertEquals(
String.valueOf(mockContainer.getNodeHttpAddress().split(":")[1]),
reader.readLine());
Assert.assertEquals(StringUtils.join(",", appDirs), reader.readLine());
Assert.assertEquals(cId.toString(), containerLaunchContext
.getEnvironment().get(Environment.CONTAINER_ID.name()));
Assert.assertEquals(mockContainer.getNodeId().getHost(),
containerLaunchContext.getEnvironment()
.get(Environment.NM_HOST.name()));
Assert.assertEquals(String.valueOf(mockContainer.getNodeId().getPort()),
containerLaunchContext.getEnvironment().get(
Environment.NM_PORT.name()));
Assert.assertEquals(
mockContainer.getNodeHttpAddress().split(":")[1],
containerLaunchContext.getEnvironment().get(
Environment.NM_HTTP_PORT.name()));
Assert.assertEquals(StringUtils.join(",", appDirs), containerLaunchContext
.getEnvironment().get(Environment.LOCAL_DIRS.name()));
// Get the pid of the process // Get the pid of the process
String pid = reader.readLine().trim(); String pid = reader.readLine().trim();
// No more lines // No more lines
@ -354,6 +372,9 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
ContainerLaunchContext containerLaunchContext = ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class); recordFactory.newRecordInstance(ContainerLaunchContext.class);
when(mockContainer.getId()).thenReturn(cId); when(mockContainer.getId()).thenReturn(cId);
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
when(mockContainer.getNodeHttpAddress()).thenReturn(
context.getNodeId().getHost() + ":12345");
containerLaunchContext.setUser(user); containerLaunchContext.setUser(user);

View File

@ -42,6 +42,7 @@ import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
@ -72,6 +73,7 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
@ -87,6 +89,7 @@ import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
@ -491,7 +494,7 @@ public class TestResourceLocalizationService {
Thread.sleep(1000); Thread.sleep(1000);
dispatcher.await(); dispatcher.await();
String appStr = ConverterUtils.toString(appId); String appStr = ConverterUtils.toString(appId);
String ctnrStr = c.getContainerID().toString(); String ctnrStr = c.getContainer().getId().toString();
ArgumentCaptor<Path> tokenPathCaptor = ArgumentCaptor.forClass(Path.class); ArgumentCaptor<Path> tokenPathCaptor = ArgumentCaptor.forClass(Path.class);
verify(exec).startLocalizer(tokenPathCaptor.capture(), verify(exec).startLocalizer(tokenPathCaptor.capture(),
isA(InetSocketAddress.class), eq("user0"), eq(appStr), eq(ctnrStr), isA(InetSocketAddress.class), eq("user0"), eq(appStr), eq(ctnrStr),
@ -567,7 +570,7 @@ public class TestResourceLocalizationService {
public boolean matches(Object o) { public boolean matches(Object o) {
ContainerEvent evt = (ContainerEvent) o; ContainerEvent evt = (ContainerEvent) o;
return evt.getType() == ContainerEventType.RESOURCE_LOCALIZED return evt.getType() == ContainerEventType.RESOURCE_LOCALIZED
&& c.getContainerID() == evt.getContainerID(); && c.getContainer().getId() == evt.getContainerID();
} }
}; };
// total 2 resource localzation calls. one for each resource. // total 2 resource localzation calls. one for each resource.
@ -756,11 +759,11 @@ public class TestResourceLocalizationService {
// Container - 1 // Container - 1
ContainerImpl container1 = createMockContainer(user, 1); ContainerImpl container1 = createMockContainer(user, 1);
String localizerId1 = container1.getContainerID().toString(); String localizerId1 = container1.getContainer().getId().toString();
rls.getPrivateLocalizers().put( rls.getPrivateLocalizers().put(
localizerId1, localizerId1,
rls.new LocalizerRunner(new LocalizerContext(user, container1 rls.new LocalizerRunner(new LocalizerContext(user, container1
.getContainerID(), null), localizerId1)); .getContainer().getId(), null), localizerId1));
LocalizerRunner localizerRunner1 = rls.getLocalizerRunner(localizerId1); LocalizerRunner localizerRunner1 = rls.getLocalizerRunner(localizerId1);
dispatcher1.getEventHandler().handle( dispatcher1.getEventHandler().handle(
@ -771,11 +774,11 @@ public class TestResourceLocalizationService {
// Container - 2 now makes the request. // Container - 2 now makes the request.
ContainerImpl container2 = createMockContainer(user, 2); ContainerImpl container2 = createMockContainer(user, 2);
String localizerId2 = container2.getContainerID().toString(); String localizerId2 = container2.getContainer().getId().toString();
rls.getPrivateLocalizers().put( rls.getPrivateLocalizers().put(
localizerId2, localizerId2,
rls.new LocalizerRunner(new LocalizerContext(user, container2 rls.new LocalizerRunner(new LocalizerContext(user, container2
.getContainerID(), null), localizerId2)); .getContainer().getId(), null), localizerId2));
LocalizerRunner localizerRunner2 = rls.getLocalizerRunner(localizerId2); LocalizerRunner localizerRunner2 = rls.getLocalizerRunner(localizerId2);
dispatcher1.getEventHandler().handle( dispatcher1.getEventHandler().handle(
createContainerLocalizationEvent(container2, createContainerLocalizationEvent(container2,
@ -848,6 +851,163 @@ public class TestResourceLocalizationService {
} }
} }
@Test(timeout = 10000)
@SuppressWarnings("unchecked")
public void testLocalResourcePath() throws Exception {
// test the local path where application and user cache files will be
// localized.
DrainDispatcher dispatcher1 = null;
try {
dispatcher1 = new DrainDispatcher();
String user = "testuser";
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
// mocked Resource Localization Service
Configuration conf = new Configuration();
AbstractFileSystem spylfs =
spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
final FileContext lfs = FileContext.getFileContext(spylfs, conf);
// We don't want files to be created
doNothing().when(spylfs).mkdir(isA(Path.class), isA(FsPermission.class),
anyBoolean());
// creating one local directory
List<Path> localDirs = new ArrayList<Path>();
String[] sDirs = new String[1];
for (int i = 0; i < 1; ++i) {
localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
sDirs[i] = localDirs.get(i).toString();
}
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
// setting log directory.
String logDir =
lfs.makeQualified(new Path(basedir, "logdir ")).toString();
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
LocalDirsHandlerService localDirHandler = new LocalDirsHandlerService();
localDirHandler.init(conf);
// Registering event handlers
EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
dispatcher1.register(ApplicationEventType.class, applicationBus);
EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
dispatcher1.register(ContainerEventType.class, containerBus);
ContainerExecutor exec = mock(ContainerExecutor.class);
DeletionService delService = mock(DeletionService.class);
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
// initializing directory handler.
dirsHandler.init(conf);
dispatcher1.init(conf);
dispatcher1.start();
ResourceLocalizationService rls =
new ResourceLocalizationService(dispatcher1, exec, delService,
localDirHandler);
dispatcher1.register(LocalizationEventType.class, rls);
rls.init(conf);
rls.handle(createApplicationLocalizationEvent(user, appId));
// We need to pre-populate the LocalizerRunner as the
// Resource Localization Service code internally starts them which
// definitely we don't want.
// creating new container and populating corresponding localizer runner
// Container - 1
Container container1 = createMockContainer(user, 1);
String localizerId1 = container1.getContainer().getId().toString();
rls.getPrivateLocalizers().put(
localizerId1,
rls.new LocalizerRunner(new LocalizerContext(user, container1
.getContainer().getId(), null), localizerId1));
// Creating two requests for container
// 1) Private resource
// 2) Application resource
LocalResourceRequest reqPriv =
new LocalResourceRequest(new Path("file:///tmp1"), 123L,
LocalResourceType.FILE, LocalResourceVisibility.PRIVATE, "");
List<LocalResourceRequest> privList =
new ArrayList<LocalResourceRequest>();
privList.add(reqPriv);
LocalResourceRequest reqApp =
new LocalResourceRequest(new Path("file:///tmp2"), 123L,
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, "");
List<LocalResourceRequest> appList =
new ArrayList<LocalResourceRequest>();
appList.add(reqApp);
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
new HashMap<LocalResourceVisibility, Collection<LocalResourceRequest>>();
rsrcs.put(LocalResourceVisibility.APPLICATION, appList);
rsrcs.put(LocalResourceVisibility.PRIVATE, privList);
dispatcher1.getEventHandler().handle(
new ContainerLocalizationRequestEvent(container1, rsrcs));
// Now waiting for resource download to start. Here actual will not start
// Only the resources will be populated into pending list.
Assert
.assertTrue(waitForPrivateDownloadToStart(rls, localizerId1, 2, 500));
// Validating user and application cache paths
String userCachePath =
StringUtils.join(Path.SEPARATOR, Arrays.asList(localDirs.get(0)
.toUri().getRawPath(), ContainerLocalizer.USERCACHE, user,
ContainerLocalizer.FILECACHE));
String userAppCachePath =
StringUtils.join(Path.SEPARATOR, Arrays.asList(localDirs.get(0)
.toUri().getRawPath(), ContainerLocalizer.USERCACHE, user,
ContainerLocalizer.APPCACHE, appId.toString(),
ContainerLocalizer.FILECACHE));
// Now the Application and private resources may come in any order
// for download.
// For User cahce :
// returned destinationPath = user cache path + random number
// For App cache :
// returned destinationPath = user app cache path + random number
int returnedResources = 0;
boolean appRsrc = false, privRsrc = false;
while (returnedResources < 2) {
LocalizerHeartbeatResponse response =
rls.heartbeat(createLocalizerStatus(localizerId1));
for (ResourceLocalizationSpec resourceSpec : response
.getResourceSpecs()) {
returnedResources++;
Path destinationDirectory =
new Path(resourceSpec.getDestinationDirectory().getFile());
if (resourceSpec.getResource().getVisibility() ==
LocalResourceVisibility.APPLICATION) {
appRsrc = true;
Assert.assertEquals(userAppCachePath, destinationDirectory
.getParent().toUri().toString());
} else if (resourceSpec.getResource().getVisibility() ==
LocalResourceVisibility.PRIVATE) {
privRsrc = true;
Assert.assertEquals(userCachePath, destinationDirectory.getParent()
.toUri().toString());
} else {
throw new Exception("Unexpected resource recevied.");
}
}
}
// We should receive both the resources (Application and Private)
Assert.assertTrue(appRsrc && privRsrc);
} finally {
if (dispatcher1 != null) {
dispatcher1.stop();
}
}
}
private LocalizerStatus createLocalizerStatusForFailedResource( private LocalizerStatus createLocalizerStatusForFailedResource(
String localizerId, LocalResourceRequest req) { String localizerId, LocalResourceRequest req) {
LocalizerStatus status = createLocalizerStatus(localizerId); LocalizerStatus status = createLocalizerStatus(localizerId);
@ -1154,7 +1314,10 @@ public class TestResourceLocalizationService {
private ContainerImpl createMockContainer(String user, int containerId) { private ContainerImpl createMockContainer(String user, int containerId) {
ContainerImpl container = mock(ContainerImpl.class); ContainerImpl container = mock(ContainerImpl.class);
when(container.getContainerID()).thenReturn( org.apache.hadoop.yarn.api.records.Container c =
mock(org.apache.hadoop.yarn.api.records.Container.class);
when(container.getContainer()).thenReturn(c);
when(container.getContainer().getId()).thenReturn(
BuilderUtils.newContainerId(1, 1, 1, containerId)); BuilderUtils.newContainerId(1, 1, 1, containerId));
when(container.getUser()).thenReturn(user); when(container.getUser()).thenReturn(user);
Credentials mockCredentials = mock(Credentials.class); Credentials mockCredentials = mock(Credentials.class);
@ -1194,8 +1357,11 @@ public class TestResourceLocalizationService {
ApplicationAttemptId appAttemptId = ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 1); BuilderUtils.newApplicationAttemptId(appId, 1);
ContainerId cId = BuilderUtils.newContainerId(appAttemptId, id); ContainerId cId = BuilderUtils.newContainerId(appAttemptId, id);
org.apache.hadoop.yarn.api.records.Container containerAPI =
mock(org.apache.hadoop.yarn.api.records.Container.class);
when(c.getContainer()).thenReturn(containerAPI);
when(c.getUser()).thenReturn("user0"); when(c.getUser()).thenReturn("user0");
when(c.getContainerID()).thenReturn(cId); when(c.getContainer().getId()).thenReturn(cId);
Credentials creds = new Credentials(); Credentials creds = new Credentials();
creds.addToken(new Text("tok" + id), getToken(id)); creds.addToken(new Text("tok" + id), getToken(id));
when(c.getCredentials()).thenReturn(creds); when(c.getCredentials()).thenReturn(creds);

View File

@ -213,6 +213,9 @@ public class TestContainersMonitor extends BaseContainerManagerTest {
cId.setApplicationAttemptId(appAttemptId); cId.setApplicationAttemptId(appAttemptId);
when(mockContainer.getId()).thenReturn(cId); when(mockContainer.getId()).thenReturn(cId);
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
when(mockContainer.getNodeHttpAddress()).thenReturn(
context.getNodeId().getHost() + ":12345");
containerLaunchContext.setUser(user); containerLaunchContext.setUser(user);
URL resource_alpha = URL resource_alpha =

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
import static org.mockito.Mockito.*;
public class MockContainer implements Container { public class MockContainer implements Container {
@ -48,6 +49,7 @@ public class MockContainer implements Container {
private final Map<Path, List<String>> resource = private final Map<Path, List<String>> resource =
new HashMap<Path, List<String>>(); new HashMap<Path, List<String>>();
private RecordFactory recordFactory; private RecordFactory recordFactory;
private org.apache.hadoop.yarn.api.records.Container mockContainer;
public MockContainer(ApplicationAttemptId appAttemptId, public MockContainer(ApplicationAttemptId appAttemptId,
Dispatcher dispatcher, Configuration conf, String user, Dispatcher dispatcher, Configuration conf, String user,
@ -62,17 +64,14 @@ public class MockContainer implements Container {
launchContext.setUser(user); launchContext.setUser(user);
this.state = ContainerState.NEW; this.state = ContainerState.NEW;
mockContainer = mock(org.apache.hadoop.yarn.api.records.Container.class);
when(mockContainer.getId()).thenReturn(id);
} }
public void setState(ContainerState state) { public void setState(ContainerState state) {
this.state = state; this.state = state;
} }
@Override
public ContainerId getContainerID() {
return id;
}
@Override @Override
public String getUser() { public String getUser() {
return user; return user;
@ -119,8 +118,7 @@ public class MockContainer implements Container {
} }
@Override @Override
public Resource getResource() { public org.apache.hadoop.yarn.api.records.Container getContainer() {
return null; return this.mockContainer;
} }
} }

View File

@ -185,16 +185,18 @@ public class TestNMWebServicesApps extends JerseyTest {
app.getUser(), app.getAppId(), 1); app.getUser(), app.getAppId(), 1);
Container container2 = new MockContainer(appAttemptId, dispatcher, conf, Container container2 = new MockContainer(appAttemptId, dispatcher, conf,
app.getUser(), app.getAppId(), 2); app.getUser(), app.getAppId(), 2);
nmContext.getContainers().put(container1.getContainerID(), container1); nmContext.getContainers()
nmContext.getContainers().put(container2.getContainerID(), container2); .put(container1.getContainer().getId(), container1);
nmContext.getContainers()
.put(container2.getContainer().getId(), container2);
app.getContainers().put(container1.getContainerID(), container1); app.getContainers().put(container1.getContainer().getId(), container1);
app.getContainers().put(container2.getContainerID(), container2); app.getContainers().put(container2.getContainer().getId(), container2);
HashMap<String, String> hash = new HashMap<String, String>(); HashMap<String, String> hash = new HashMap<String, String>();
hash.put(container1.getContainerID().toString(), container1 hash.put(container1.getContainer().getId().toString(), container1
.getContainerID().toString()); .getContainer().getId().toString());
hash.put(container2.getContainerID().toString(), container2 hash.put(container2.getContainer().getId().toString(), container2
.getContainerID().toString()); .getContainer().getId().toString());
return hash; return hash;
} }

View File

@ -186,16 +186,18 @@ public class TestNMWebServicesContainers extends JerseyTest {
app.getUser(), app.getAppId(), 1); app.getUser(), app.getAppId(), 1);
Container container2 = new MockContainer(appAttemptId, dispatcher, conf, Container container2 = new MockContainer(appAttemptId, dispatcher, conf,
app.getUser(), app.getAppId(), 2); app.getUser(), app.getAppId(), 2);
nmContext.getContainers().put(container1.getContainerID(), container1); nmContext.getContainers()
nmContext.getContainers().put(container2.getContainerID(), container2); .put(container1.getContainer().getId(), container1);
nmContext.getContainers()
.put(container2.getContainer().getId(), container2);
app.getContainers().put(container1.getContainerID(), container1); app.getContainers().put(container1.getContainer().getId(), container1);
app.getContainers().put(container2.getContainerID(), container2); app.getContainers().put(container2.getContainer().getId(), container2);
HashMap<String, String> hash = new HashMap<String, String>(); HashMap<String, String> hash = new HashMap<String, String>();
hash.put(container1.getContainerID().toString(), container1 hash.put(container1.getContainer().getId().toString(), container1
.getContainerID().toString()); .getContainer().getId().toString());
hash.put(container2.getContainerID().toString(), container2 hash.put(container2.getContainer().getId().toString(), container2
.getContainerID().toString()); .getContainer().getId().toString());
return hash; return hash;
} }
@ -468,7 +470,7 @@ public class TestNMWebServicesContainers extends JerseyTest {
String state, String user, int exitCode, String diagnostics, String state, String user, int exitCode, String diagnostics,
String nodeId, int totalMemoryNeededMB, String logsLink) String nodeId, int totalMemoryNeededMB, String logsLink)
throws JSONException, Exception { throws JSONException, Exception {
WebServicesTestUtils.checkStringMatch("id", cont.getContainerID() WebServicesTestUtils.checkStringMatch("id", cont.getContainer().getId()
.toString(), id); .toString(), id);
WebServicesTestUtils.checkStringMatch("state", cont.getContainerState() WebServicesTestUtils.checkStringMatch("state", cont.getContainerState()
.toString(), state); .toString(), state);
@ -481,8 +483,9 @@ public class TestNMWebServicesContainers extends JerseyTest {
WebServicesTestUtils.checkStringMatch("nodeId", nmContext.getNodeId() WebServicesTestUtils.checkStringMatch("nodeId", nmContext.getNodeId()
.toString(), nodeId); .toString(), nodeId);
assertEquals("totalMemoryNeededMB wrong", 0, totalMemoryNeededMB); assertEquals("totalMemoryNeededMB wrong", 0, totalMemoryNeededMB);
String shortLink = ujoin("containerlogs", cont.getContainerID().toString(), String shortLink =
cont.getUser()); ujoin("containerlogs", cont.getContainer().getId().toString(),
cont.getUser());
assertTrue("containerLogsLink wrong", logsLink.contains(shortLink)); assertTrue("containerLogsLink wrong", logsLink.contains(shortLink));
} }

View File

@ -178,17 +178,7 @@ public class AMLauncher implements Runnable {
Map<String, String> environment = container.getEnvironment(); Map<String, String> environment = container.getEnvironment();
environment.put(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV, environment.put(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV,
application.getWebProxyBase()); application.getWebProxyBase());
// Set the AppAttemptId, containerId, NMHTTPAdress, AppSubmitTime to be // Set AppSubmitTime and MaxAppAttempts to be consumable by the AM.
// consumable by the AM.
environment.put(ApplicationConstants.AM_CONTAINER_ID_ENV,
containerID.toString());
environment.put(ApplicationConstants.NM_HOST_ENV, masterContainer
.getNodeId().getHost());
environment.put(ApplicationConstants.NM_PORT_ENV,
String.valueOf(masterContainer.getNodeId().getPort()));
String parts[] =
masterContainer.getNodeHttpAddress().split(":");
environment.put(ApplicationConstants.NM_HTTP_PORT_ENV, parts[1]);
ApplicationId applicationId = ApplicationId applicationId =
application.getAppAttemptId().getApplicationId(); application.getAppAttemptId().getApplicationId();
environment.put( environment.put(

View File

@ -48,6 +48,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.AbstractService;
import com.google.common.annotations.VisibleForTesting;
/** /**
* Service to renew application delegation tokens. * Service to renew application delegation tokens.
*/ */
@ -139,7 +141,8 @@ public class DelegationTokenRenewer extends AbstractService {
* class that is used for keeping tracks of DT to renew * class that is used for keeping tracks of DT to renew
* *
*/ */
private static class DelegationTokenToRenew { @VisibleForTesting
protected static class DelegationTokenToRenew {
public final Token<?> token; public final Token<?> token;
public final ApplicationId applicationId; public final ApplicationId applicationId;
public final Configuration conf; public final Configuration conf;
@ -252,7 +255,16 @@ public class DelegationTokenRenewer extends AbstractService {
private void addTokenToList(DelegationTokenToRenew t) { private void addTokenToList(DelegationTokenToRenew t) {
delegationTokens.add(t); delegationTokens.add(t);
} }
@VisibleForTesting
public Set<Token<?>> getDelegationTokens() {
Set<Token<?>> tokens = new HashSet<Token<?>>();
for(DelegationTokenToRenew delegationToken : delegationTokens) {
tokens.add(delegationToken.token);
}
return tokens;
}
/** /**
* Add application tokens for renewal. * Add application tokens for renewal.
* @param applicationId added application * @param applicationId added application
@ -343,7 +355,8 @@ public class DelegationTokenRenewer extends AbstractService {
/** /**
* set task to renew the token * set task to renew the token
*/ */
private void setTimerForTokenRenewal(DelegationTokenToRenew token) @VisibleForTesting
protected void setTimerForTokenRenewal(DelegationTokenToRenew token)
throws IOException { throws IOException {
// calculate timer time // calculate timer time
@ -358,7 +371,8 @@ public class DelegationTokenRenewer extends AbstractService {
} }
// renew a token // renew a token
private void renewToken(final DelegationTokenToRenew dttr) @VisibleForTesting
protected void renewToken(final DelegationTokenToRenew dttr)
throws IOException { throws IOException {
// need to use doAs so that http can find the kerberos tgt // need to use doAs so that http can find the kerberos tgt
// NOTE: token renewers should be responsible for the correct UGI! // NOTE: token renewers should be responsible for the correct UGI!

View File

@ -18,12 +18,15 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import java.nio.ByteBuffer;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.util.Map; import java.util.Map;
import junit.framework.Assert; import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ClientRMProtocol; import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
@ -41,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
@ -130,26 +134,26 @@ public class MockRM extends ResourceManager {
public RMApp submitApp(int masterMemory, String name, String user) throws Exception { public RMApp submitApp(int masterMemory, String name, String user) throws Exception {
return submitApp(masterMemory, name, user, null, false, null, return submitApp(masterMemory, name, user, null, false, null,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)); YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null);
} }
public RMApp submitApp(int masterMemory, String name, String user, public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls) throws Exception { Map<ApplicationAccessType, String> acls) throws Exception {
return submitApp(masterMemory, name, user, acls, false, null, return submitApp(masterMemory, name, user, acls, false, null,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)); YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null);
} }
public RMApp submitApp(int masterMemory, String name, String user, public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, String queue) throws Exception { Map<ApplicationAccessType, String> acls, String queue) throws Exception {
return submitApp(masterMemory, name, user, acls, false, queue, return submitApp(masterMemory, name, user, acls, false, queue,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)); YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null);
} }
public RMApp submitApp(int masterMemory, String name, String user, public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue, Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
int maxAppAttempts) throws Exception { int maxAppAttempts, Credentials ts) throws Exception {
ClientRMProtocol client = getClientRMService(); ClientRMProtocol client = getClientRMService();
GetNewApplicationResponse resp = client.getNewApplication(Records GetNewApplicationResponse resp = client.getNewApplication(Records
.newRecord(GetNewApplicationRequest.class)); .newRecord(GetNewApplicationRequest.class));
@ -175,6 +179,12 @@ public class MockRM extends ResourceManager {
sub.setResource(capability); sub.setResource(capability);
clc.setApplicationACLs(acls); clc.setApplicationACLs(acls);
clc.setUser(user); clc.setUser(user);
if (ts != null && UserGroupInformation.isSecurityEnabled()) {
DataOutputBuffer dob = new DataOutputBuffer();
ts.writeTokenStorageToStream(dob);
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
clc.setContainerTokens(securityTokens);
}
sub.setAMContainerSpec(clc); sub.setAMContainerSpec(clc);
req.setApplicationSubmissionContext(sub); req.setApplicationSubmissionContext(sub);
UserGroupInformation fakeUser = UserGroupInformation fakeUser =
@ -357,6 +367,10 @@ public class MockRM extends ResourceManager {
return this.nodesListManager; return this.nodesListManager;
} }
public RMDelegationTokenSecretManager getRMDTSecretManager() {
return this.rmDTSecretManager;
}
@Override @Override
protected void startWepApp() { protected void startWepApp() {
// override to disable webapp // override to disable webapp

View File

@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
@ -71,17 +70,17 @@ public class TestApplicationMasterLauncher {
launched = true; launched = true;
Map<String, String> env = Map<String, String> env =
request.getContainerLaunchContext().getEnvironment(); request.getContainerLaunchContext().getEnvironment();
containerIdAtContainerManager =
env.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
ContainerId containerId = ContainerId containerId =
ConverterUtils.toContainerId(containerIdAtContainerManager); request.getContainer().getId();
containerIdAtContainerManager = containerId.toString();
attemptIdAtContainerManager = attemptIdAtContainerManager =
containerId.getApplicationAttemptId().toString(); containerId.getApplicationAttemptId().toString();
nmHostAtContainerManager = env.get(ApplicationConstants.NM_HOST_ENV); nmHostAtContainerManager = request.getContainer().getNodeId().getHost();
nmPortAtContainerManager = nmPortAtContainerManager =
Integer.parseInt(env.get(ApplicationConstants.NM_PORT_ENV)); request.getContainer().getNodeId().getPort();
nmHttpPortAtContainerManager = nmHttpPortAtContainerManager =
Integer.parseInt(env.get(ApplicationConstants.NM_HTTP_PORT_ENV)); Integer.parseInt(request.getContainer().getNodeHttpAddress()
.split(":")[1]);
submitTimeAtContainerManager = submitTimeAtContainerManager =
Long.parseLong(env.get(ApplicationConstants.APP_SUBMIT_TIME_ENV)); Long.parseLong(env.get(ApplicationConstants.APP_SUBMIT_TIME_ENV));
maxAppAttempts = maxAppAttempts =

View File

@ -18,11 +18,21 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@ -33,9 +43,11 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
@ -43,6 +55,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
@ -60,10 +74,8 @@ public class TestRMRestart {
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
conf.set(YarnConfiguration.RM_STORE, conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
"org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore"); conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName());
conf.set(YarnConfiguration.RM_SCHEDULER,
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler");
Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
@ -159,7 +171,7 @@ public class TestRMRestart {
// create unmanaged app // create unmanaged app
RMApp appUnmanaged = rm1.submitApp(200, "someApp", "someUser", null, true, RMApp appUnmanaged = rm1.submitApp(200, "someApp", "someUser", null, true,
null, conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, null, conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)); YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null);
ApplicationAttemptId unmanagedAttemptId = ApplicationAttemptId unmanagedAttemptId =
appUnmanaged.getCurrentAppAttempt().getAppAttemptId(); appUnmanaged.getCurrentAppAttempt().getAppAttemptId();
// assert appUnmanaged info is saved // assert appUnmanaged info is saved
@ -321,8 +333,7 @@ public class TestRMRestart {
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
conf.set(YarnConfiguration.RM_STORE, conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
"org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore");
Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
@ -340,10 +351,12 @@ public class TestRMRestart {
// submit an app with maxAppAttempts equals to 1 // submit an app with maxAppAttempts equals to 1
RMApp app1 = rm1.submitApp(200, "name", "user", RMApp app1 = rm1.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false, "default", 1); new HashMap<ApplicationAccessType, String>(), false, "default", 1,
null);
// submit an app with maxAppAttempts equals to -1 // submit an app with maxAppAttempts equals to -1
RMApp app2 = rm1.submitApp(200, "name", "user", RMApp app2 = rm1.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false, "default", -1); new HashMap<ApplicationAccessType, String>(), false, "default", -1,
null);
// assert app1 info is saved // assert app1 info is saved
ApplicationState appState = rmAppState.get(app1.getApplicationId()); ApplicationState appState = rmAppState.get(app1.getApplicationId());
@ -389,4 +402,113 @@ public class TestRMRestart {
rm1.stop(); rm1.stop();
rm2.stop(); rm2.stop();
} }
@Test
public void testTokenRestoredOnRMrestart() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
ExitUtil.disableSystemExit();
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
UserGroupInformation.setConfiguration(conf);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
Map<ApplicationId, ApplicationState> rmAppState =
rmState.getApplicationState();
MockRM rm1 = new MyMockRM(conf, memStore);
rm1.start();
HashSet<Token<RMDelegationTokenIdentifier>> tokenSet =
new HashSet<Token<RMDelegationTokenIdentifier>>();
// create an empty credential
Credentials ts = new Credentials();
// create tokens and add into credential
Text userText1 = new Text("user1");
RMDelegationTokenIdentifier dtId1 =
new RMDelegationTokenIdentifier(userText1, new Text("renewer1"),
userText1);
Token<RMDelegationTokenIdentifier> token1 =
new Token<RMDelegationTokenIdentifier>(dtId1,
rm1.getRMDTSecretManager());
ts.addToken(userText1, token1);
tokenSet.add(token1);
Text userText2 = new Text("user2");
RMDelegationTokenIdentifier dtId2 =
new RMDelegationTokenIdentifier(userText2, new Text("renewer2"),
userText2);
Token<RMDelegationTokenIdentifier> token2 =
new Token<RMDelegationTokenIdentifier>(dtId2,
rm1.getRMDTSecretManager());
ts.addToken(userText2, token2);
tokenSet.add(token2);
// submit an app with customized credential
RMApp app = rm1.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false, "default", 1, ts);
// assert app info is saved
ApplicationState appState = rmAppState.get(app.getApplicationId());
Assert.assertNotNull(appState);
// assert delegation tokens are saved
DataOutputBuffer dob = new DataOutputBuffer();
ts.writeTokenStorageToStream(dob);
ByteBuffer securityTokens =
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
Assert.assertEquals(securityTokens, appState
.getApplicationSubmissionContext().getAMContainerSpec()
.getContainerTokens());
// start new RM
MockRM rm2 = new MyMockRM(conf, memStore);
rm2.start();
// verify tokens are properly populated back to DelegationTokenRenewer
Assert.assertEquals(tokenSet, rm1.getRMContext()
.getDelegationTokenRenewer().getDelegationTokens());
// stop the RM
rm1.stop();
rm2.stop();
}
class MyMockRM extends MockRM {
public MyMockRM(Configuration conf, RMStateStore store) {
super(conf, store);
}
@Override
protected void doSecureLogin() throws IOException {
// Do nothing.
}
@Override
protected DelegationTokenRenewer createDelegationTokenRenewer() {
return new DelegationTokenRenewer() {
@Override
protected void renewToken(final DelegationTokenToRenew dttr)
throws IOException {
// Do nothing
}
@Override
protected void setTimerForTokenRenewal(DelegationTokenToRenew token)
throws IOException {
// Do nothing
}
};
}
}
} }