Merge r1400738 through r1401868 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1401869 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-10-24 20:42:03 +00:00
commit 14d8d0a670
52 changed files with 991 additions and 251 deletions

View File

@ -62,6 +62,13 @@
<include>**/*</include>
</includes>
</fileSet>
<fileSet>
<directory>hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf</directory>
<outputDirectory>etc/hadoop</outputDirectory>
<includes>
<include>**/*</include>
</includes>
</fileSet>
<fileSet>
<directory>${basedir}</directory>
<outputDirectory>/share/doc/hadoop/${hadoop.component}</outputDirectory>

View File

@ -375,11 +375,14 @@ Release 2.0.3-alpha - Unreleased
(rkanter via tucu)
HADOOP-8900. BuiltInGzipDecompressor throws IOException - stored gzip size
doesn't match decompressed size. (Slavik Krassovsky via suresh)
doesn't match decompressed size. (Andy Isaacson via suresh)
HADOOP-8948. TestFileUtil.testGetDU fails on Windows due to incorrect
assumption of line separator. (Chris Nauroth via suresh)
HADOOP-8951. RunJar to fail with user-comprehensible error
message if jar missing. (stevel via suresh)
Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES
@ -1091,6 +1094,12 @@ Release 0.23.5 - UNRELEASED
HADOOP-8906. paths with multiple globs are unreliable. (Daryn Sharp via
jlowe)
HADOOP-8811. Compile hadoop native library in FreeBSD (Radim Kolar via
bobby)
HADOOP-8962. RawLocalFileSystem.listStatus fails when a child filename
contains a colon (jlowe via bobby)
Release 0.23.4 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -67,6 +67,9 @@ macro(set_find_shared_library_version LVERS)
IF(${CMAKE_SYSTEM_NAME} MATCHES "Darwin")
# Mac OS uses .dylib
SET(CMAKE_FIND_LIBRARY_SUFFIXES ".${LVERS}.dylib")
ELSEIF(${CMAKE_SYSTEM_NAME} MATCHES "FreeBSD")
# FreeBSD has always .so installed.
SET(CMAKE_FIND_LIBRARY_SUFFIXES ".so")
ELSEIF(${CMAKE_SYSTEM_NAME} MATCHES "Windows")
# Windows doesn't support finding shared libraries by version.
ELSE()
@ -95,8 +98,10 @@ GET_FILENAME_COMPONENT(HADOOP_ZLIB_LIBRARY ${ZLIB_LIBRARIES} NAME)
INCLUDE(CheckFunctionExists)
INCLUDE(CheckCSourceCompiles)
INCLUDE(CheckLibraryExists)
CHECK_FUNCTION_EXISTS(sync_file_range HAVE_SYNC_FILE_RANGE)
CHECK_FUNCTION_EXISTS(posix_fadvise HAVE_POSIX_FADVISE)
CHECK_LIBRARY_EXISTS(dl dlopen "" NEED_LINK_DL)
SET(STORED_CMAKE_FIND_LIBRARY_SUFFIXES CMAKE_FIND_LIBRARY_SUFFIXES)
set_find_shared_library_version("1")
@ -159,6 +164,9 @@ add_dual_library(hadoop
${D}/util/NativeCrc32.c
${D}/util/bulk_crc32.c
)
if (NEED_LINK_DL)
set(LIB_DL dl)
endif (NEED_LINK_DL)
IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
#
@ -171,7 +179,7 @@ IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
ENDIF()
target_link_dual_libraries(hadoop
dl
${LIB_DL}
${JAVA_JVM_LIBRARY}
)
SET(LIBHADOOP_VERSION "1.0.0")

View File

@ -43,7 +43,8 @@ public class HardLink {
OS_TYPE_UNIX,
OS_TYPE_WINXP,
OS_TYPE_SOLARIS,
OS_TYPE_MAC
OS_TYPE_MAC,
OS_TYPE_FREEBSD
}
public static OSType osType;
@ -63,7 +64,7 @@ public class HardLink {
getHardLinkCommand = new HardLinkCGUnix();
//override getLinkCountCommand for the particular Unix variant
//Linux is already set as the default - {"stat","-c%h", null}
if (osType == OSType.OS_TYPE_MAC) {
if (osType == OSType.OS_TYPE_MAC || osType == OSType.OS_TYPE_FREEBSD) {
String[] linkCountCmdTemplate = {"/usr/bin/stat","-f%l", null};
HardLinkCGUnix.setLinkCountCmdTemplate(linkCountCmdTemplate);
} else if (osType == OSType.OS_TYPE_SOLARIS) {
@ -95,6 +96,9 @@ public class HardLink {
else if (osName.contains("Mac")) {
return OSType.OS_TYPE_MAC;
}
else if (osName.contains("FreeBSD")) {
return OSType.OS_TYPE_FREEBSD;
}
else {
return OSType.OS_TYPE_UNIX;
}

View File

@ -350,7 +350,7 @@ public class RawLocalFileSystem extends FileSystem {
new RawLocalFileStatus(localf, getDefaultBlockSize(f), this) };
}
String[] names = localf.list();
File[] names = localf.listFiles();
if (names == null) {
return null;
}
@ -358,7 +358,7 @@ public class RawLocalFileSystem extends FileSystem {
int j = 0;
for (int i = 0; i < names.length; i++) {
try {
results[j] = getFileStatus(new Path(f, names[i]));
results[j] = getFileStatus(new Path(names[i].getAbsolutePath()));
j++;
} catch (FileNotFoundException e) {
// ignore the files not found since the dir list may have have changed

View File

@ -126,6 +126,10 @@ public class RunJar {
int firstArg = 0;
String fileName = args[firstArg++];
File file = new File(fileName);
if (!file.exists() || !file.isFile()) {
System.err.println("Not a valid JAR: " + file.getCanonicalPath());
System.exit(-1);
}
String mainClassName = null;
JarFile jarFile;

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.collect.Interner;
import com.google.common.collect.Interners;
/**
* Provides equivalent behavior to String.intern() to optimize performance,
* whereby does not consume memory in the permanent generation.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class StringInterner {
/**
* Retains a strong reference to each string instance it has interned.
*/
private final static Interner<String> strongInterner;
/**
* Retains a weak reference to each string instance it has interned.
*/
private final static Interner<String> weakInterner;
static {
strongInterner = Interners.newStrongInterner();
weakInterner = Interners.newWeakInterner();
}
/**
* Interns and returns a reference to the representative instance
* for any of a collection of string instances that are equal to each other.
* Retains strong reference to the instance,
* thus preventing it from being garbage-collected.
*
* @param sample string instance to be interned
* @return strong reference to interned string instance
*/
public static String strongIntern(String sample) {
return strongInterner.intern(sample);
}
/**
* Interns and returns a reference to the representative instance
* for any of a collection of string instances that are equal to each other.
* Retains weak reference to the instance,
* and so does not prevent it from being garbage-collected.
*
* @param sample string instance to be interned
* @return weak reference to interned string instance
*/
public static String weakIntern(String sample) {
return weakInterner.intern(sample);
}
}

View File

@ -254,7 +254,11 @@ Java_org_apache_hadoop_io_nativeio_NativeIO_posix_1fadvise(
int err = 0;
if ((err = posix_fadvise(fd, (off_t)offset, (off_t)len, flags))) {
#ifdef __FreeBSD__
throw_ioe(env, errno);
#else
throw_ioe(env, err);
#endif
}
#endif
}
@ -310,6 +314,22 @@ Java_org_apache_hadoop_io_nativeio_NativeIO_sync_1file_1range(
#endif
}
#ifdef __FreeBSD__
static int toFreeBSDFlags(int flags)
{
int rc = flags & 03;
if ( flags & 0100 ) rc |= O_CREAT;
if ( flags & 0200 ) rc |= O_EXCL;
if ( flags & 0400 ) rc |= O_NOCTTY;
if ( flags & 01000 ) rc |= O_TRUNC;
if ( flags & 02000 ) rc |= O_APPEND;
if ( flags & 04000 ) rc |= O_NONBLOCK;
if ( flags &010000 ) rc |= O_SYNC;
if ( flags &020000 ) rc |= O_ASYNC;
return rc;
}
#endif
/*
* public static native FileDescriptor open(String path, int flags, int mode);
*/
@ -318,6 +338,9 @@ Java_org_apache_hadoop_io_nativeio_NativeIO_open(
JNIEnv *env, jclass clazz, jstring j_path,
jint flags, jint mode)
{
#ifdef __FreeBSD__
flags = toFreeBSDFlags(flags);
#endif
jobject ret = NULL;
const char *path = (*env)->GetStringUTFChars(env, j_path, NULL);
@ -399,7 +422,7 @@ err:
* Determine how big a buffer we need for reentrant getpwuid_r and getgrnam_r
*/
ssize_t get_pw_buflen() {
size_t ret = 0;
long ret = 0;
#ifdef _SC_GETPW_R_SIZE_MAX
ret = sysconf(_SC_GETPW_R_SIZE_MAX);
#endif

View File

@ -46,6 +46,7 @@ JNIEXPORT jobjectArray JNICALL
Java_org_apache_hadoop_security_JniBasedUnixGroupsNetgroupMapping_getUsersForNetgroupJNI
(JNIEnv *env, jobject jobj, jstring jgroup) {
UserList *userListHead = NULL;
UserList *current = NULL;
int userListSize = 0;
// pointers to free at the end
@ -72,8 +73,10 @@ Java_org_apache_hadoop_security_JniBasedUnixGroupsNetgroupMapping_getUsersForNet
// was successful or not (as long as it was called we need to call
// endnetgrent)
setnetgrentCalledFlag = 1;
#ifndef __FreeBSD__
if(setnetgrent(cgroup) == 1) {
UserList *current = NULL;
#endif
current = NULL;
// three pointers are for host, user, domain, we only care
// about user now
char *p[3];
@ -87,7 +90,9 @@ Java_org_apache_hadoop_security_JniBasedUnixGroupsNetgroupMapping_getUsersForNet
userListSize++;
}
}
#ifndef __FreeBSD__
}
#endif
//--------------------------------------------------
// build return data (java array)
@ -101,7 +106,7 @@ Java_org_apache_hadoop_security_JniBasedUnixGroupsNetgroupMapping_getUsersForNet
goto END;
}
UserList * current = NULL;
current = NULL;
// note that the loop iterates over list but also over array (i)
int i = 0;

View File

@ -78,7 +78,7 @@ int getGroupIDList(const char *user, int *ngroups, gid_t **groups) {
*/
int getGroupDetails(gid_t group, char **grpBuf) {
struct group * grp = NULL;
size_t currBufferSize = sysconf(_SC_GETGR_R_SIZE_MAX);
long currBufferSize = sysconf(_SC_GETGR_R_SIZE_MAX);
if (currBufferSize < 1024) {
currBufferSize = 1024;
}
@ -123,7 +123,7 @@ int getGroupDetails(gid_t group, char **grpBuf) {
*/
int getPW(const char *user, char **pwbuf) {
struct passwd *pwbufp = NULL;
size_t currBufferSize = sysconf(_SC_GETPW_R_SIZE_MAX);
long currBufferSize = sysconf(_SC_GETPW_R_SIZE_MAX);
if (currBufferSize < 1024) {
currBufferSize = 1024;
}

View File

@ -32,7 +32,9 @@
#include "bulk_crc32.h"
#include "gcc_optimizations.h"
#ifndef __FreeBSD__
#define USE_PIPELINED
#endif
#define CRC_INITIAL_VAL 0xffffffff
@ -260,7 +262,7 @@ static uint32_t crc32_zlib_sb8(
// Begin code for SSE4.2 specific hardware support of CRC32C
///////////////////////////////////////////////////////////////////////////
#if (defined(__amd64__) || defined(__i386)) && defined(__GNUC__)
#if (defined(__amd64__) || defined(__i386)) && defined(__GNUC__) && !defined(__FreeBSD__)
# define SSE42_FEATURE_BIT (1 << 20)
# define CPUID_FEATURES 1
/**

View File

@ -364,8 +364,12 @@ public class TestHardLink {
callCount = createHardLinkMult(src, fileNames, tgt_mult, maxLength);
//check the request was completed in exactly two "chunks"
assertEquals(2, callCount);
String[] tgt_multNames = tgt_mult.list();
//sort directory listings before comparsion
Arrays.sort(fileNames);
Arrays.sort(tgt_multNames);
//and check the results were as expected in the dir tree
assertTrue(Arrays.deepEquals(fileNames, tgt_mult.list()));
assertArrayEquals(fileNames, tgt_multNames);
//Test the case where maxlength is too small even for one filename.
//It should go ahead and try the single files.
@ -382,8 +386,12 @@ public class TestHardLink {
maxLength);
//should go ahead with each of the three single file names
assertEquals(3, callCount);
//check the results were as expected in the dir tree
assertTrue(Arrays.deepEquals(fileNames, tgt_mult.list()));
tgt_multNames = tgt_mult.list();
//sort directory listings before comparsion
Arrays.sort(fileNames);
Arrays.sort(tgt_multNames);
//and check the results were as expected in the dir tree
assertArrayEquals(fileNames, tgt_multNames);
}
/*

View File

@ -249,6 +249,7 @@ public class TestLocalFileSystem {
assertEquals(1, fileSchemeCount);
}
@Test
public void testHasFileDescriptor() throws IOException {
Configuration conf = new Configuration();
LocalFileSystem fs = FileSystem.getLocal(conf);
@ -258,4 +259,17 @@ public class TestLocalFileSystem {
new RawLocalFileSystem().new LocalFSFileInputStream(path), 1024);
assertNotNull(bis.getFileDescriptor());
}
@Test
public void testListStatusWithColons() throws IOException {
Configuration conf = new Configuration();
LocalFileSystem fs = FileSystem.getLocal(conf);
File colonFile = new File(TEST_ROOT_DIR, "foo:bar");
colonFile.mkdirs();
colonFile.createNewFile();
FileStatus[] stats = fs.listStatus(new Path(TEST_ROOT_DIR));
assertEquals("Unexpected number of stats", 1, stats.length);
assertEquals("Bad path from stat", colonFile.getAbsolutePath(),
stats[0].getPath().toUri().getPath());
}
}

View File

@ -224,7 +224,10 @@ public class TestNativeIO {
// we should just skip the unit test on machines where we don't
// have fadvise support
assumeTrue(false);
} finally {
} catch (NativeIOException nioe) {
// ignore this error as FreeBSD returns EBADF even if length is zero
}
finally {
fis.close();
}

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import static org.junit.Assert.*;
import static org.apache.hadoop.util.StringInterner.*;
import org.junit.Test;
/**
*
* Tests string interning {@link StringInterner}
*/
public class TestStringInterner {
/**
* Test different references are returned for any of string
* instances that are equal to each other but not interned.
*/
@Test
public void testNoIntern() {
String literalABC = "ABC";
String substringABC = "ABCDE".substring(0,3);
String heapABC = new String("ABC");
assertNotSame(literalABC, substringABC);
assertNotSame(literalABC, heapABC);
assertNotSame(substringABC, heapABC);
}
/**
* Test the same strong reference is returned for any
* of string instances that are equal to each other.
*/
@Test
public void testStrongIntern() {
String strongInternLiteralABC = strongIntern("ABC");
String strongInternSubstringABC = strongIntern("ABCDE".substring(0,3));
String strongInternHeapABC = strongIntern(new String("ABC"));
assertSame(strongInternLiteralABC, strongInternSubstringABC);
assertSame(strongInternLiteralABC, strongInternHeapABC);
assertSame(strongInternSubstringABC, strongInternHeapABC);
}
/**
* Test the same weak reference is returned for any
* of string instances that are equal to each other.
*/
@Test
public void testWeakIntern() {
String weakInternLiteralABC = weakIntern("ABC");
String weakInternSubstringABC = weakIntern("ABCDE".substring(0,3));
String weakInternHeapABC = weakIntern(new String("ABC"));
assertSame(weakInternLiteralABC, weakInternSubstringABC);
assertSame(weakInternLiteralABC, weakInternHeapABC);
assertSame(weakInternSubstringABC, weakInternHeapABC);
}
}

View File

@ -230,6 +230,9 @@ Trunk (Unreleased)
HADOOP-8158. Interrupting hadoop fs -put from the command line
causes a LeaseExpiredException. (daryn via harsh)
HDFS-2434. TestNameNodeMetrics.testCorruptBlock fails intermittently.
(Jing Zhao via suresh)
BREAKDOWN OF HDFS-3077 SUBTASKS
HDFS-3077. Quorum-based protocol for reading and writing edit logs.
@ -408,6 +411,8 @@ Release 2.0.3-alpha - Unreleased
HDFS-4088. Remove "throws QuotaExceededException" from an
INodeDirectoryWithQuota constructor. (szetszwo)
HDFS-4099. Clean up replication code and add more javadoc. (szetszwo)
OPTIMIZATIONS
BUG FIXES
@ -480,7 +485,8 @@ Release 2.0.3-alpha - Unreleased
HDFS-4072. On file deletion remove corresponding blocks pending
replications. (Jing Zhao via suresh)
HDFS-4022. Replication not happening for appended block. (Vinay via umamahesh)
HDFS-4022. Replication not happening for appended block.
(Vinay via umamahesh)
Release 2.0.2-alpha - 2012-09-07
@ -1879,6 +1885,9 @@ Release 0.23.5 - UNRELEASED
HDFS-3224. Bug in check for DN re-registration with different storage ID
(jlowe)
HDFS-4090. getFileChecksum() result incompatible when called against
zero-byte files. (Kihwal Lee via daryn)
Release 0.23.4 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -1769,6 +1769,13 @@ public class DFSClient implements java.io.Closeable {
return new MD5MD5CRC32CastagnoliFileChecksum(bytesPerCRC,
crcPerBlock, fileMD5);
default:
// If there is no block allocated for the file,
// return one with the magic entry that matches what previous
// hdfs versions return.
if (locatedblocks.size() == 0) {
return new MD5MD5CRC32GzipFileChecksum(0, 0, fileMD5);
}
// we should never get here since the validity was checked
// when getCrcType() was called above.
return null;

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.util.ExitUtil.terminate;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
@ -49,14 +51,11 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
import static org.apache.hadoop.util.ExitUtil.terminate;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
@ -2833,28 +2832,32 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
}
}
public void checkReplication(Block block, short numExpectedReplicas) {
// filter out containingNodes that are marked for decommission.
NumberReplicas number = countNodes(block);
if (isNeededReplication(block, numExpectedReplicas, number.liveReplicas())) {
neededReplications.add(block,
number.liveReplicas(),
number.decommissionedReplicas(),
numExpectedReplicas);
return;
}
if (number.liveReplicas() > numExpectedReplicas) {
processOverReplicatedBlock(block, numExpectedReplicas, null, null);
/**
* Check replication of the blocks in the collection.
* If any block is needed replication, insert it into the replication queue.
* Otherwise, if the block is more than the expected replication factor,
* process it as an over replicated block.
*/
public void checkReplication(BlockCollection bc) {
final short expected = bc.getBlockReplication();
for (Block block : bc.getBlocks()) {
final NumberReplicas n = countNodes(block);
if (isNeededReplication(block, expected, n.liveReplicas())) {
neededReplications.add(block, n.liveReplicas(),
n.decommissionedReplicas(), expected);
} else if (n.liveReplicas() > expected) {
processOverReplicatedBlock(block, expected, null, null);
}
}
}
/* get replication factor of a block */
/**
* @return 0 if the block is not found;
* otherwise, return the replication factor of the block.
*/
private int getReplication(Block block) {
BlockCollection bc = blocksMap.getBlockCollection(block);
if (bc == null) { // block does not belong to any file
return 0;
}
return bc.getBlockReplication();
final BlockCollection bc = blocksMap.getBlockCollection(block);
return bc == null? 0: bc.getBlockReplication();
}
@ -2929,12 +2932,12 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
return enoughRacks;
}
boolean isNeededReplication(Block b, int expectedReplication, int curReplicas) {
if ((curReplicas >= expectedReplication) && (blockHasEnoughRacks(b))) {
return false;
} else {
return true;
}
/**
* A block needs replication if the number of replicas is less than expected
* or if it does not have enough racks.
*/
private boolean isNeededReplication(Block b, int expected, int current) {
return current < expected || !blockHasEnoughRacks(b);
}
public long getMissingBlocksCount() {

View File

@ -2433,21 +2433,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return true;
}
/**
* Check all blocks of a file. If any blocks are lower than their intended
* replication factor, then insert them into neededReplication and if
* the blocks are more than the intended replication factor then insert
* them into invalidateBlocks.
*/
private void checkReplicationFactor(INodeFile file) {
short numExpectedReplicas = file.getBlockReplication();
Block[] pendingBlocks = file.getBlocks();
int nrBlocks = pendingBlocks.length;
for (int i = 0; i < nrBlocks; i++) {
blockManager.checkReplication(pendingBlocks[i], numExpectedReplicas);
}
}
/**
* Allocate a block at the given pending filename
*
@ -3180,7 +3165,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
// close file and persist block allocations for this file
dir.closeFile(src, newFile);
checkReplicationFactor(newFile);
blockManager.checkReplication(newFile);
}
void commitBlockSynchronization(ExtendedBlock lastblock,

View File

@ -542,6 +542,21 @@ public class TestDistributedFileSystem {
final FileChecksum webhdfs_qfoocs = webhdfs.getFileChecksum(webhdfsqualified);
System.out.println("webhdfs_qfoocs=" + webhdfs_qfoocs);
//create a zero byte file
final Path zeroByteFile = new Path(dir, "zeroByteFile" + n);
{
final FSDataOutputStream out = hdfs.create(zeroByteFile, false, buffer_size,
(short)2, block_size);
out.close();
}
// verify the magic val for zero byte files
{
final FileChecksum zeroChecksum = hdfs.getFileChecksum(zeroByteFile);
assertEquals(zeroChecksum.toString(),
"MD5-of-0MD5-of-0CRC32:70bc8f4b72a86921468bf8e8441dce51");
}
//write another file
final Path bar = new Path(dir, "bar" + n);
{

View File

@ -205,6 +205,12 @@ public class TestNameNodeMetrics {
final Path file = getTestPath("testCorruptBlock");
createFile(file, 100, (short)2);
// Disable the heartbeats, so that no corrupted replica
// can be fixed
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
}
// Corrupt first replica of the block
LocatedBlock block = NameNodeAdapter.getBlockLocations(
cluster.getNameNode(), file.toString(), 0, 1).get(0);
@ -215,12 +221,23 @@ public class TestNameNodeMetrics {
} finally {
cluster.getNamesystem().writeUnlock();
}
Thread.sleep(1000); // Wait for block to be marked corrupt
BlockManagerTestUtil.getComputedDatanodeWork(bm);
BlockManagerTestUtil.updateState(bm);
MetricsRecordBuilder rb = getMetrics(NS_METRICS);
assertGauge("CorruptBlocks", 1L, rb);
assertGauge("PendingReplicationBlocks", 1L, rb);
assertGauge("ScheduledReplicationBlocks", 1L, rb);
fs.delete(file, true);
// During the file deletion, both BlockManager#corruptReplicas and
// BlockManager#pendingReplications will be updated, i.e., the records
// for the blocks of the deleted file will be removed from both
// corruptReplicas and pendingReplications. The corresponding
// metrics (CorruptBlocks and PendingReplicationBlocks) will only be updated
// when BlockManager#computeDatanodeWork is run where the
// BlockManager#udpateState is called. And in
// BlockManager#computeDatanodeWork the metric ScheduledReplicationBlocks
// will also be updated.
rb = waitForDnMetricValue(NS_METRICS, "CorruptBlocks", 0L);
assertGauge("PendingReplicationBlocks", 0L, rb);
assertGauge("ScheduledReplicationBlocks", 0L, rb);

View File

@ -189,6 +189,9 @@ Release 2.0.3-alpha - Unreleased
MAPREDUCE-4736. Remove obsolete option [-rootDir] from TestDFSIO.
(Brandon Li via suresh)
MAPREDUCE-4637. Handle TaskAttempt diagnostic updates while in the NEW and
UNASSIGNED states. (Mayank Bansal via sseth)
Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES
@ -599,6 +602,14 @@ Release 0.23.5 - UNRELEASED
MAPREDUCE-4733. Reducer can fail to make progress during shuffle if too many
reducers complete consecutively. (Jason Lowe via vinodkv)
MAPREDUCE-4740. only .jars can be added to the Distributed Cache
classpath. (Robert Joseph Evans via jlowe)
MAPREDUCE-4229. Intern counter names in the JT (Miomir Boljanovic and bobby via daryn)
MAPREDUCE-4741. WARN and ERROR messages logged during normal AM shutdown.
(Vinod Kumar Vavilapalli via jlowe)
Release 0.23.4 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -107,6 +107,8 @@ import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting;
/**
* The Map-Reduce Application Master.
* The state machine is encapsulated in the implementation of Job interface.
@ -398,52 +400,65 @@ public class MRAppMaster extends CompositeService {
protected void sysexit() {
System.exit(0);
}
@VisibleForTesting
public void shutDownJob() {
// job has finished
// this is the only job, so shut down the Appmaster
// note in a workflow scenario, this may lead to creation of a new
// job (FIXME?)
// Send job-end notification
if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
try {
LOG.info("Job end notification started for jobID : "
+ job.getReport().getJobId());
JobEndNotifier notifier = new JobEndNotifier();
notifier.setConf(getConfig());
notifier.notify(job.getReport());
} catch (InterruptedException ie) {
LOG.warn("Job end notification interrupted for jobID : "
+ job.getReport().getJobId(), ie);
}
}
// TODO:currently just wait for some time so clients can know the
// final states. Will be removed once RM come on.
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
//We are finishing cleanly so this is the last retry
isLastAMRetry = true;
// Stop all services
// This will also send the final report to the ResourceManager
LOG.info("Calling stop for all the services");
MRAppMaster.this.stop();
} catch (Throwable t) {
LOG.warn("Graceful stop failed ", t);
}
//Bring the process down by force.
//Not needed after HADOOP-7140
LOG.info("Exiting MR AppMaster..GoodBye!");
sysexit();
}
private class JobFinishEventHandler implements EventHandler<JobFinishEvent> {
@Override
public void handle(JobFinishEvent event) {
// job has finished
// this is the only job, so shut down the Appmaster
// note in a workflow scenario, this may lead to creation of a new
// job (FIXME?)
// Send job-end notification
if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
try {
LOG.info("Job end notification started for jobID : "
+ job.getReport().getJobId());
JobEndNotifier notifier = new JobEndNotifier();
notifier.setConf(getConfig());
notifier.notify(job.getReport());
} catch (InterruptedException ie) {
LOG.warn("Job end notification interrupted for jobID : "
+ job.getReport().getJobId(), ie);
// Create a new thread to shutdown the AM. We should not do it in-line
// to avoid blocking the dispatcher itself.
new Thread() {
@Override
public void run() {
shutDownJob();
}
}
// TODO:currently just wait for some time so clients can know the
// final states. Will be removed once RM come on.
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
//We are finishing cleanly so this is the last retry
isLastAMRetry = true;
// Stop all services
// This will also send the final report to the ResourceManager
LOG.info("Calling stop for all the services");
stop();
} catch (Throwable t) {
LOG.warn("Graceful stop failed ", t);
}
//Bring the process down by force.
//Not needed after HADOOP-7140
LOG.info("Exiting MR AppMaster..GoodBye!");
sysexit();
}.start();
}
}

View File

@ -200,6 +200,10 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEventType.TA_KILL, new KilledTransition())
.addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_FAILMSG, new FailedTransition())
.addTransition(TaskAttemptStateInternal.NEW,
TaskAttemptStateInternal.NEW,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Transitions from the UNASSIGNED state.
.addTransition(TaskAttemptStateInternal.UNASSIGNED,
@ -211,6 +215,10 @@ public abstract class TaskAttemptImpl implements
.addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_FAILMSG, new DeallocateContainerTransition(
TaskAttemptStateInternal.FAILED, true))
.addTransition(TaskAttemptStateInternal.UNASSIGNED,
TaskAttemptStateInternal.UNASSIGNED,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Transitions from the ASSIGNED state.
.addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.RUNNING,

View File

@ -30,6 +30,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -81,6 +82,7 @@ public class ContainerLauncherImpl extends AbstractService implements
protected BlockingQueue<ContainerLauncherEvent> eventQueue =
new LinkedBlockingQueue<ContainerLauncherEvent>();
YarnRPC rpc;
private final AtomicBoolean stopped;
private Container getContainer(ContainerLauncherEvent event) {
ContainerId id = event.getContainerID();
@ -237,6 +239,7 @@ public class ContainerLauncherImpl extends AbstractService implements
public ContainerLauncherImpl(AppContext context) {
super(ContainerLauncherImpl.class.getName());
this.context = context;
this.stopped = new AtomicBoolean(false);
}
@Override
@ -271,11 +274,13 @@ public class ContainerLauncherImpl extends AbstractService implements
@Override
public void run() {
ContainerLauncherEvent event = null;
while (!Thread.currentThread().isInterrupted()) {
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
event = eventQueue.take();
} catch (InterruptedException e) {
LOG.error("Returning, interrupted : " + e);
if (!stopped.get()) {
LOG.error("Returning, interrupted : " + e);
}
return;
}
int poolSize = launcherPool.getCorePoolSize();
@ -324,6 +329,10 @@ public class ContainerLauncherImpl extends AbstractService implements
}
public void stop() {
if (stopped.getAndSet(true)) {
// return if already stopped
return;
}
// shutdown any containers that might be left running
shutdownAllContainers();
eventHandlingThread.interrupt();

View File

@ -67,7 +67,7 @@ public abstract class RMCommunicator extends AbstractService {
private int rmPollInterval;//millis
protected ApplicationId applicationId;
protected ApplicationAttemptId applicationAttemptId;
private AtomicBoolean stopped;
private final AtomicBoolean stopped;
protected Thread allocatorThread;
@SuppressWarnings("rawtypes")
protected EventHandler eventHandler;
@ -239,7 +239,9 @@ public abstract class RMCommunicator extends AbstractService {
// TODO: for other exceptions
}
} catch (InterruptedException e) {
LOG.warn("Allocated thread interrupted. Returning.");
if (!stopped.get()) {
LOG.warn("Allocated thread interrupted. Returning.");
}
return;
}
}

View File

@ -32,6 +32,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -84,7 +85,7 @@ public class RMContainerAllocator extends RMContainerRequestor
private static final Priority PRIORITY_MAP;
private Thread eventHandlingThread;
private volatile boolean stopEventHandling;
private final AtomicBoolean stopped;
static {
PRIORITY_FAST_FAIL_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class);
@ -145,6 +146,7 @@ public class RMContainerAllocator extends RMContainerRequestor
public RMContainerAllocator(ClientService clientService, AppContext context) {
super(clientService, context);
this.stopped = new AtomicBoolean(false);
}
@Override
@ -176,11 +178,13 @@ public class RMContainerAllocator extends RMContainerRequestor
ContainerAllocatorEvent event;
while (!stopEventHandling && !Thread.currentThread().isInterrupted()) {
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
event = RMContainerAllocator.this.eventQueue.take();
} catch (InterruptedException e) {
LOG.error("Returning, interrupted : " + e);
if (!stopped.get()) {
LOG.error("Returning, interrupted : " + e);
}
return;
}
@ -234,7 +238,10 @@ public class RMContainerAllocator extends RMContainerRequestor
@Override
public void stop() {
this.stopEventHandling = true;
if (stopped.getAndSet(true)) {
// return if already stopped
return;
}
eventHandlingThread.interrupt();
super.stop();
LOG.info("Final Stats: " + getStat());

View File

@ -23,6 +23,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -43,10 +44,12 @@ public class TaskCleanerImpl extends AbstractService implements TaskCleaner {
private Thread eventHandlingThread;
private BlockingQueue<TaskCleanupEvent> eventQueue =
new LinkedBlockingQueue<TaskCleanupEvent>();
private final AtomicBoolean stopped;
public TaskCleanerImpl(AppContext context) {
super("TaskCleaner");
this.context = context;
this.stopped = new AtomicBoolean(false);
}
public void start() {
@ -59,11 +62,13 @@ public class TaskCleanerImpl extends AbstractService implements TaskCleaner {
@Override
public void run() {
TaskCleanupEvent event = null;
while (!Thread.currentThread().isInterrupted()) {
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
event = eventQueue.take();
} catch (InterruptedException e) {
LOG.error("Returning, interrupted : " + e);
if (!stopped.get()) {
LOG.error("Returning, interrupted : " + e);
}
return;
}
// the events from the queue are handled in parallel
@ -77,6 +82,10 @@ public class TaskCleanerImpl extends AbstractService implements TaskCleaner {
}
public void stop() {
if (stopped.getAndSet(true)) {
// return if already stopped
return;
}
eventHandlingThread.interrupt();
launcherPool.shutdown();
super.stop();

View File

@ -21,17 +21,15 @@ package org.apache.hadoop.mapreduce.v2.app;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;
import java.io.IOException;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -49,7 +47,6 @@ import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.AbstractService;
@ -68,7 +65,6 @@ import org.junit.Test;
private Path stagingJobPath = new Path(stagingJobDir);
private final static RecordFactory recordFactory = RecordFactoryProvider.
getRecordFactory(null);
private static final Log LOG = LogFactory.getLog(TestStagingCleanup.class);
@Test
public void testDeletionofStaging() throws IOException {
@ -86,9 +82,7 @@ import org.junit.Test;
jobid.setAppId(appId);
MRAppMaster appMaster = new TestMRApp(attemptId);
appMaster.init(conf);
EventHandler<JobFinishEvent> handler =
appMaster.createJobFinishEventHandler();
handler.handle(new JobFinishEvent(jobid));
appMaster.shutDownJob();
verify(fs).delete(stagingJobPath, true);
}

View File

@ -546,6 +546,105 @@ public class TestTaskAttempt{
eventHandler.internalError);
}
@Test
public void testAppDiognosticEventOnUnassignedTask() throws Exception {
ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
appId, 0);
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
Path jobFile = mock(Path.class);
MockEventHandler eventHandler = new MockEventHandler();
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
when(taListener.getAddress()).thenReturn(
new InetSocketAddress("localhost", 0));
JobConf jobConf = new JobConf();
jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
jobConf.setBoolean("fs.file.impl.disable.cache", true);
jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
AppContext appCtx = mock(AppContext.class);
ClusterInfo clusterInfo = mock(ClusterInfo.class);
Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
when(resource.getMemory()).thenReturn(1024);
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
jobFile, 1, splits, jobConf, taListener, mock(OutputCommitter.class),
mock(Token.class), new Credentials(), new SystemClock(), appCtx);
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
taImpl.handle(new TaskAttemptEvent(attemptId,
TaskAttemptEventType.TA_SCHEDULE));
taImpl.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptId,
"Task got killed"));
assertFalse(
"InternalError occurred trying to handle TA_DIAGNOSTICS_UPDATE on assigned task",
eventHandler.internalError);
}
@Test
public void testAppDiognosticEventOnNewTask() throws Exception {
ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
appId, 0);
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
Path jobFile = mock(Path.class);
MockEventHandler eventHandler = new MockEventHandler();
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
when(taListener.getAddress()).thenReturn(
new InetSocketAddress("localhost", 0));
JobConf jobConf = new JobConf();
jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
jobConf.setBoolean("fs.file.impl.disable.cache", true);
jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
AppContext appCtx = mock(AppContext.class);
ClusterInfo clusterInfo = mock(ClusterInfo.class);
Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
when(resource.getMemory()).thenReturn(1024);
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
jobFile, 1, splits, jobConf, taListener, mock(OutputCommitter.class),
mock(Token.class), new Credentials(), new SystemClock(), appCtx);
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
taImpl.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptId,
"Task got killed"));
assertFalse(
"InternalError occurred trying to handle TA_DIAGNOSTICS_UPDATE on assigned task",
eventHandler.internalError);
}
public static class MockEventHandler implements EventHandler {
public boolean internalError;

View File

@ -191,6 +191,7 @@ public class MRApps extends Apps {
// TODO: Remove duplicates.
}
@SuppressWarnings("deprecation")
public static void setClasspath(Map<String, String> environment,
Configuration conf) throws IOException {
boolean userClassesTakesPrecedence =
@ -218,11 +219,66 @@ public class MRApps extends Apps {
environment,
Environment.CLASSPATH.name(),
Environment.PWD.$() + Path.SEPARATOR + "*");
// a * in the classpath will only find a .jar, so we need to filter out
// all .jars and add everything else
addToClasspathIfNotJar(DistributedCache.getFileClassPaths(conf),
DistributedCache.getCacheFiles(conf),
conf,
environment);
addToClasspathIfNotJar(DistributedCache.getArchiveClassPaths(conf),
DistributedCache.getCacheArchives(conf),
conf,
environment);
if (userClassesTakesPrecedence) {
MRApps.setMRFrameworkClasspath(environment, conf);
}
}
/**
* Add the paths to the classpath if they are not jars
* @param paths the paths to add to the classpath
* @param withLinks the corresponding paths that may have a link name in them
* @param conf used to resolve the paths
* @param environment the environment to update CLASSPATH in
* @throws IOException if there is an error resolving any of the paths.
*/
private static void addToClasspathIfNotJar(Path[] paths,
URI[] withLinks, Configuration conf,
Map<String, String> environment) throws IOException {
if (paths != null) {
HashMap<Path, String> linkLookup = new HashMap<Path, String>();
if (withLinks != null) {
for (URI u: withLinks) {
Path p = new Path(u);
FileSystem remoteFS = p.getFileSystem(conf);
p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory()));
String name = (null == u.getFragment())
? p.getName() : u.getFragment();
if (!name.toLowerCase().endsWith(".jar")) {
linkLookup.put(p, name);
}
}
}
for (Path p : paths) {
FileSystem remoteFS = p.getFileSystem(conf);
p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory()));
String name = linkLookup.get(p);
if (name == null) {
name = p.getName();
}
if(!name.toLowerCase().endsWith(".jar")) {
Apps.addToEnvironment(
environment,
Environment.CLASSPATH.name(),
Environment.PWD.$() + Path.SEPARATOR + name);
}
}
}
}
private static final String STAGING_CONSTANT = ".staging";
public static Path getStagingAreaDir(Configuration conf, String user) {
return new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR,
@ -261,8 +317,7 @@ public class MRApps extends Apps {
DistributedCache.getCacheArchives(conf),
parseTimeStamps(DistributedCache.getArchiveTimestamps(conf)),
getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES),
DistributedCache.getArchiveVisibilities(conf),
DistributedCache.getArchiveClassPaths(conf));
DistributedCache.getArchiveVisibilities(conf));
// Cache files
parseDistributedCacheArtifacts(conf,
@ -271,8 +326,7 @@ public class MRApps extends Apps {
DistributedCache.getCacheFiles(conf),
parseTimeStamps(DistributedCache.getFileTimestamps(conf)),
getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
DistributedCache.getFileVisibilities(conf),
DistributedCache.getFileClassPaths(conf));
DistributedCache.getFileVisibilities(conf));
}
private static String getResourceDescription(LocalResourceType type) {
@ -289,8 +343,8 @@ public class MRApps extends Apps {
Configuration conf,
Map<String, LocalResource> localResources,
LocalResourceType type,
URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[],
Path[] pathsToPutOnClasspath) throws IOException {
URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[])
throws IOException {
if (uris != null) {
// Sanity check
@ -304,15 +358,6 @@ public class MRApps extends Apps {
);
}
Map<String, Path> classPaths = new HashMap<String, Path>();
if (pathsToPutOnClasspath != null) {
for (Path p : pathsToPutOnClasspath) {
FileSystem remoteFS = p.getFileSystem(conf);
p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory()));
classPaths.put(p.toUri().getPath().toString(), p);
}
}
for (int i = 0; i < uris.length; ++i) {
URI u = uris[i];
Path p = new Path(u);

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.v2.util;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
@ -42,12 +44,36 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
public class TestMRApps {
private static File testWorkDir = null;
@BeforeClass
public static void setupTestDirs() throws IOException {
testWorkDir = new File("target", TestMRApps.class.getCanonicalName());
delete(testWorkDir);
testWorkDir.mkdirs();
testWorkDir = testWorkDir.getAbsoluteFile();
}
@AfterClass
public static void cleanupTestDirs() throws IOException {
if (testWorkDir != null) {
delete(testWorkDir);
}
}
private static void delete(File dir) throws IOException {
Path p = new Path("file://"+dir.getAbsolutePath());
Configuration conf = new Configuration();
FileSystem fs = p.getFileSystem(conf);
fs.delete(p, true);
}
@Test public void testJobIDtoString() {
JobId jid = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobId.class);
@ -154,6 +180,28 @@ public class TestMRApps {
}
assertTrue(environment.get("CLASSPATH").contains(mrAppClasspath));
}
@Test public void testSetClasspathWithArchives () throws IOException {
File testTGZ = new File(testWorkDir, "test.tgz");
FileOutputStream out = new FileOutputStream(testTGZ);
out.write(0);
out.close();
Job job = Job.getInstance();
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.CLASSPATH_ARCHIVES, "file://"
+ testTGZ.getAbsolutePath());
conf.set(MRJobConfig.CACHE_ARCHIVES, "file://"
+ testTGZ.getAbsolutePath() + "#testTGZ");
Map<String, String> environment = new HashMap<String, String>();
MRApps.setClasspath(environment, conf);
assertTrue(environment.get("CLASSPATH").startsWith("$PWD:"));
String confClasspath = job.getConfiguration().get(YarnConfiguration.YARN_APPLICATION_CLASSPATH);
if (confClasspath != null) {
confClasspath = confClasspath.replaceAll(",\\s*", ":").trim();
}
assertTrue(environment.get("CLASSPATH").contains(confClasspath));
assertTrue(environment.get("CLASSPATH").contains("testTGZ"));
}
@Test public void testSetClasspathWithUserPrecendence() {
Configuration conf = new Configuration();

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.util.StringInterner;
import org.apache.avro.Schema;
import org.apache.avro.io.Decoder;
@ -170,9 +171,11 @@ public class EventReader implements Closeable {
Counters result = new Counters();
for (JhCounterGroup g : counters.groups) {
CounterGroup group =
result.addGroup(g.name.toString(), g.displayName.toString());
result.addGroup(StringInterner.weakIntern(g.name.toString()),
StringInterner.weakIntern(g.displayName.toString()));
for (JhCounter c : g.counts) {
group.addCounter(c.name.toString(), c.displayName.toString(), c.value);
group.addCounter(StringInterner.weakIntern(c.name.toString()),
StringInterner.weakIntern(c.displayName.toString()), c.value);
}
}
return result;

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -226,10 +227,10 @@ public class JobHistoryParser {
TaskAttemptInfo attemptInfo =
taskInfo.attemptsMap.get(event.getAttemptId());
attemptInfo.finishTime = event.getFinishTime();
attemptInfo.status = event.getTaskStatus();
attemptInfo.state = event.getState();
attemptInfo.status = StringInterner.weakIntern(event.getTaskStatus());
attemptInfo.state = StringInterner.weakIntern(event.getState());
attemptInfo.counters = event.getCounters();
attemptInfo.hostname = event.getHostname();
attemptInfo.hostname = StringInterner.weakIntern(event.getHostname());
}
private void handleReduceAttemptFinishedEvent
@ -238,14 +239,14 @@ public class JobHistoryParser {
TaskAttemptInfo attemptInfo =
taskInfo.attemptsMap.get(event.getAttemptId());
attemptInfo.finishTime = event.getFinishTime();
attemptInfo.status = event.getTaskStatus();
attemptInfo.state = event.getState();
attemptInfo.status = StringInterner.weakIntern(event.getTaskStatus());
attemptInfo.state = StringInterner.weakIntern(event.getState());
attemptInfo.shuffleFinishTime = event.getShuffleFinishTime();
attemptInfo.sortFinishTime = event.getSortFinishTime();
attemptInfo.counters = event.getCounters();
attemptInfo.hostname = event.getHostname();
attemptInfo.hostname = StringInterner.weakIntern(event.getHostname());
attemptInfo.port = event.getPort();
attemptInfo.rackname = event.getRackName();
attemptInfo.rackname = StringInterner.weakIntern(event.getRackName());
}
private void handleMapAttemptFinishedEvent(MapAttemptFinishedEvent event) {
@ -253,13 +254,13 @@ public class JobHistoryParser {
TaskAttemptInfo attemptInfo =
taskInfo.attemptsMap.get(event.getAttemptId());
attemptInfo.finishTime = event.getFinishTime();
attemptInfo.status = event.getTaskStatus();
attemptInfo.state = event.getState();
attemptInfo.status = StringInterner.weakIntern(event.getTaskStatus());
attemptInfo.state = StringInterner.weakIntern(event.getState());
attemptInfo.mapFinishTime = event.getMapFinishTime();
attemptInfo.counters = event.getCounters();
attemptInfo.hostname = event.getHostname();
attemptInfo.hostname = StringInterner.weakIntern(event.getHostname());
attemptInfo.port = event.getPort();
attemptInfo.rackname = event.getRackName();
attemptInfo.rackname = StringInterner.weakIntern(event.getRackName());
}
private void handleTaskAttemptFailedEvent(
@ -269,10 +270,10 @@ public class JobHistoryParser {
taskInfo.attemptsMap.get(event.getTaskAttemptId());
attemptInfo.finishTime = event.getFinishTime();
attemptInfo.error = event.getError();
attemptInfo.status = event.getTaskStatus();
attemptInfo.hostname = event.getHostname();
attemptInfo.status = StringInterner.weakIntern(event.getTaskStatus());
attemptInfo.hostname = StringInterner.weakIntern(event.getHostname());
attemptInfo.port = event.getPort();
attemptInfo.rackname = event.getRackName();
attemptInfo.rackname = StringInterner.weakIntern(event.getRackName());
attemptInfo.shuffleFinishTime = event.getFinishTime();
attemptInfo.sortFinishTime = event.getFinishTime();
attemptInfo.mapFinishTime = event.getFinishTime();
@ -300,7 +301,7 @@ public class JobHistoryParser {
attemptInfo.startTime = event.getStartTime();
attemptInfo.attemptId = event.getTaskAttemptId();
attemptInfo.httpPort = event.getHttpPort();
attemptInfo.trackerName = event.getTrackerName();
attemptInfo.trackerName = StringInterner.weakIntern(event.getTrackerName());
attemptInfo.taskType = event.getTaskType();
attemptInfo.shufflePort = event.getShufflePort();
attemptInfo.containerId = event.getContainerId();
@ -344,7 +345,7 @@ public class JobHistoryParser {
info.finishTime = event.getFinishTime();
info.finishedMaps = event.getFinishedMaps();
info.finishedReduces = event.getFinishedReduces();
info.jobStatus = event.getStatus();
info.jobStatus = StringInterner.weakIntern(event.getStatus());
}
private void handleJobFinishedEvent(JobFinishedEvent event) {
@ -375,7 +376,7 @@ public class JobHistoryParser {
amInfo.appAttemptId = event.getAppAttemptId();
amInfo.startTime = event.getStartTime();
amInfo.containerId = event.getContainerId();
amInfo.nodeManagerHost = event.getNodeManagerHost();
amInfo.nodeManagerHost = StringInterner.weakIntern(event.getNodeManagerHost());
amInfo.nodeManagerPort = event.getNodeManagerPort();
amInfo.nodeManagerHttpPort = event.getNodeManagerHttpPort();
if (info.amInfos == null) {
@ -393,11 +394,11 @@ public class JobHistoryParser {
private void handleJobSubmittedEvent(JobSubmittedEvent event) {
info.jobid = event.getJobId();
info.jobname = event.getJobName();
info.username = event.getUserName();
info.username = StringInterner.weakIntern(event.getUserName());
info.submitTime = event.getSubmitTime();
info.jobConfPath = event.getJobConfPath();
info.jobACLs = event.getJobAcls();
info.jobQueueName = event.getJobQueueName();
info.jobQueueName = StringInterner.weakIntern(event.getJobQueueName());
}
/**

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.counters.AbstractCounters;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.counters.CounterGroupBase;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.util.StringUtils;
/**
@ -235,13 +236,13 @@ public class CountersStrings {
// Get the actual name
String groupName =
getBlock(groupString, UNIT_OPEN, UNIT_CLOSE, groupIndex);
groupName = unescape(groupName);
StringInterner.weakIntern(getBlock(groupString, UNIT_OPEN, UNIT_CLOSE, groupIndex));
groupName = StringInterner.weakIntern(unescape(groupName));
// Get the display name
String groupDisplayName =
getBlock(groupString, UNIT_OPEN, UNIT_CLOSE, groupIndex);
groupDisplayName = unescape(groupDisplayName);
StringInterner.weakIntern(getBlock(groupString, UNIT_OPEN, UNIT_CLOSE, groupIndex));
groupDisplayName = StringInterner.weakIntern(unescape(groupDisplayName));
// Get the counters
G group = counters.getGroup(groupName);
@ -255,13 +256,13 @@ public class CountersStrings {
// Get the actual name
String counterName =
getBlock(counterString, UNIT_OPEN, UNIT_CLOSE, counterIndex);
counterName = unescape(counterName);
StringInterner.weakIntern(getBlock(counterString, UNIT_OPEN, UNIT_CLOSE, counterIndex));
counterName = StringInterner.weakIntern(unescape(counterName));
// Get the display name
String counterDisplayName =
getBlock(counterString, UNIT_OPEN, UNIT_CLOSE, counterIndex);
counterDisplayName = unescape(counterDisplayName);
StringInterner.weakIntern(getBlock(counterString, UNIT_OPEN, UNIT_CLOSE, counterIndex));
counterDisplayName = StringInterner.weakIntern(unescape(counterDisplayName));
// Get the value
long value =

View File

@ -86,7 +86,13 @@ Release 2.0.3-alpha - Unreleased
YARN-150. Fixes AppRejectedTransition does not unregister a rejected
app-attempt from the ApplicationMasterService (Bikas Saha via sseth)
YARN-140. Add capacity-scheduler-default.xml to provide a default set of configurations for the capacity scheduler. (ahmed via tucu)
YARN-140. Add capacity-scheduler-default.xml to provide a default set of
configurations for the capacity scheduler. (ahmed via tucu)
YARN-179. Fix some unit test failures. (Vinod Kumar Vavilapalli via sseth)
YARN-181. Fixed eclipse settings broken by capacity-scheduler.xml move via
YARN-140. (Siddharth Seth via vinodkv)
Release 2.0.2-alpha - 2012-09-07
@ -165,6 +171,20 @@ Release 0.23.5 - UNRELEASED
YARN-163. Retrieving container log via NM webapp can hang with multibyte
characters in log (jlowe via bobby)
YARN-174. Modify NodeManager to pass the user's configuration even when
rebooting. (vinodkv)
YARN-177. CapacityScheduler - adding a queue while the RM is running has
wacky results (acmurthy vai tgraves)
YARN-178. Fix custom ProcessTree instance creation (Radim Kolar via bobby)
YARN-180. Capacity scheduler - containers that get reserved create
container token to early (acmurthy and bobby)
YARN-139. Interrupted Exception within AsyncDispatcher leads to user
confusion. (Vinod Kumar Vavilapalli via jlowe)
Release 0.23.4 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -51,6 +51,12 @@
<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-common</artifactId>

View File

@ -143,7 +143,7 @@ public class UnmanagedAMLauncher {
appName = cliParser.getOptionValue("appname", "UnmanagedAM");
amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
amQueue = cliParser.getOptionValue("queue", "");
amQueue = cliParser.getOptionValue("queue", "default");
classpath = cliParser.getOptionValue("classpath", null);
amCmd = cliParser.getOptionValue("cmd");

View File

@ -141,6 +141,8 @@ public class TestUnmanagedAMLauncher {
String[] args = {
"--classpath",
classpath,
"--queue",
"default",
"--cmd",
javaHome
+ "/bin/java -Xmx512m "

View File

@ -38,5 +38,12 @@
<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>
</project>

View File

@ -68,7 +68,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
try {
event = eventQueue.take();
} catch(InterruptedException ie) {
LOG.warn("AsyncDispatcher thread interrupted", ie);
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", ie);
}
return;
}
if (event != null) {
@ -180,7 +182,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
try {
eventQueue.put(event);
} catch (InterruptedException e) {
LOG.warn("AsyncDispatcher thread interrupted", e);
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", e);
}
throw new YarnException(e);
}
};

View File

@ -114,6 +114,7 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
* @param procfsDir the root of a proc file system - only used for testing.
*/
public ProcfsBasedProcessTree(String pid, String procfsDir) {
super(pid);
this.pid = getValidPID(pid);
this.procfsDir = procfsDir;
}

View File

@ -21,16 +21,27 @@ package org.apache.hadoop.yarn.util;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.ReflectionUtils;
import java.lang.reflect.Constructor;
/**
* Interface class to obtain process resource usage
*
*/
public abstract class ResourceCalculatorProcessTree {
public abstract class ResourceCalculatorProcessTree extends Configured {
static final Log LOG = LogFactory
.getLog(ResourceCalculatorProcessTree.class);
/**
* Create process-tree instance with specified root process.
*
* Subclass must override this.
* @param root process-tree root-process
*/
public ResourceCalculatorProcessTree(String root) {
}
/**
* Get the process-tree with latest state. If the root-process is not alive,
* an empty tree will be returned.
@ -122,10 +133,17 @@ public abstract class ResourceCalculatorProcessTree {
* is not available for this system.
*/
public static ResourceCalculatorProcessTree getResourceCalculatorProcessTree(
String pid, Class<? extends ResourceCalculatorProcessTree> clazz, Configuration conf) {
String pid, Class<? extends ResourceCalculatorProcessTree> clazz, Configuration conf) {
if (clazz != null) {
return ReflectionUtils.newInstance(clazz, conf);
try {
Constructor <? extends ResourceCalculatorProcessTree> c = clazz.getConstructor(String.class);
ResourceCalculatorProcessTree rctree = c.newInstance(pid);
rctree.setConf(conf);
return rctree;
} catch(Exception e) {
throw new RuntimeException(e);
}
}
// No class given, try a os specific class

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.util;
import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.hamcrest.core.IsInstanceOf.*;
import static org.hamcrest.core.IsSame.*;
/**
* A JUnit test to test {@link ResourceCalculatorPlugin}
*/
public class TestResourceCalculatorProcessTree {
public static class EmptyProcessTree extends ResourceCalculatorProcessTree {
public EmptyProcessTree(String pid) {
super(pid);
}
public ResourceCalculatorProcessTree getProcessTree() {
return this;
}
public String getProcessTreeDump() {
return "Empty tree for testing";
}
public long getCumulativeRssmem(int age) {
return 0;
}
public long getCumulativeVmem(int age) {
return 0;
}
public long getCumulativeCpuTime() {
return 0;
}
public boolean checkPidPgrpidForMatch() {
return false;
}
}
@Test
public void testCreateInstance() {
ResourceCalculatorProcessTree tree;
tree = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree("1", EmptyProcessTree.class, new Configuration());
assertNotNull(tree);
assertThat(tree, instanceOf(EmptyProcessTree.class));
}
@Test
public void testCreatedInstanceConfigured() {
ResourceCalculatorProcessTree tree;
Configuration conf = new Configuration();
tree = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree("1", EmptyProcessTree.class, conf);
assertNotNull(tree);
assertThat(tree.getConf(), sameInstance(conf));
}
}

View File

@ -253,12 +253,12 @@ public class NodeManager extends CompositeService implements
if (hasToReboot) {
LOG.info("Rebooting the node manager.");
NodeManager nodeManager = createNewNodeManager();
nodeManager.initAndStartNodeManager(hasToReboot);
nodeManager.initAndStartNodeManager(this.getConfig(), hasToReboot);
}
}
}
private void initAndStartNodeManager(boolean hasToReboot) {
private void initAndStartNodeManager(Configuration conf, boolean hasToReboot) {
try {
// Remove the old hook if we are rebooting.
@ -270,7 +270,6 @@ public class NodeManager extends CompositeService implements
ShutdownHookManager.get().addShutdownHook(nodeManagerShutdownHook,
SHUTDOWN_HOOK_PRIORITY);
YarnConfiguration conf = new YarnConfiguration();
this.init(conf);
this.start();
} catch (Throwable t) {
@ -288,6 +287,7 @@ public class NodeManager extends CompositeService implements
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
NodeManager nodeManager = new NodeManager();
nodeManager.initAndStartNodeManager(false);
Configuration conf = new YarnConfiguration();
nodeManager.initAndStartNodeManager(conf, false);
}
}

View File

@ -50,7 +50,7 @@
<directory>${basedir}/src/test/resources</directory>
</testResource>
<testResource>
<directory>${basedir}/../../conf</directory>
<directory>${basedir}/conf</directory>
<includes>
<include>capacity-scheduler.xml</include>
</includes>

View File

@ -50,6 +50,12 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
*/
public CSQueue getParent();
/**
* Set the parent <code>Queue</code>.
* @param newParentQueue new parent queue
*/
public void setParent(CSQueue newParentQueue);
/**
* Get the queue name.
* @return the queue name
@ -195,10 +201,10 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
/**
* Reinitialize the queue.
* @param queue new queue to re-initalize from
* @param newlyParsedQueue new queue to re-initalize from
* @param clusterResource resources in the cluster
*/
public void reinitialize(CSQueue queue, Resource clusterResource)
public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
throws IOException;
/**

View File

@ -45,7 +45,6 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
@ -111,21 +110,18 @@ implements ResourceScheduler, CapacitySchedulerContext, Configurable {
}
};
@Override
public void setConf(Configuration conf) {
if (conf instanceof YarnConfiguration) {
yarnConf = (YarnConfiguration) conf;
} else {
throw new IllegalArgumentException("Can only configure with " +
"YarnConfiguration");
}
yarnConf = conf;
}
@Override
public Configuration getConf() {
return yarnConf;
}
private CapacitySchedulerConfiguration conf;
private YarnConfiguration yarnConf;
private Configuration yarnConf;
private RMContext rmContext;
private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>();

View File

@ -223,7 +223,7 @@ public class LeafQueue implements CSQueue {
{
// Sanity check
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
float absCapacity = parent.getAbsoluteCapacity() * capacity;
float absCapacity = getParent().getAbsoluteCapacity() * capacity;
CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absCapacity, absoluteMaxCapacity);
this.capacity = capacity;
@ -256,7 +256,7 @@ public class LeafQueue implements CSQueue {
// Update metrics
CSQueueUtils.updateQueueStatistics(
this, parent, clusterResource, minimumAllocation);
this, getParent(), clusterResource, minimumAllocation);
LOG.info("Initializing " + queueName + "\n" +
"capacity = " + capacity +
@ -339,10 +339,15 @@ public class LeafQueue implements CSQueue {
}
@Override
public CSQueue getParent() {
public synchronized CSQueue getParent() {
return parent;
}
@Override
public synchronized void setParent(CSQueue newParentQueue) {
this.parent = (ParentQueue)newParentQueue;
}
@Override
public String getQueueName() {
return queueName;
@ -350,7 +355,7 @@ public class LeafQueue implements CSQueue {
@Override
public String getQueuePath() {
return parent.getQueuePath() + "." + getQueueName();
return getParent().getQueuePath() + "." + getQueueName();
}
/**
@ -430,7 +435,9 @@ public class LeafQueue implements CSQueue {
synchronized void setMaxCapacity(float maximumCapacity) {
// Sanity check
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
float absMaxCapacity = CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);
float absMaxCapacity =
CSQueueUtils.computeAbsoluteMaximumCapacity(
maximumCapacity, getParent());
CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absoluteCapacity, absMaxCapacity);
this.maximumCapacity = maximumCapacity;
@ -453,10 +460,6 @@ public class LeafQueue implements CSQueue {
this.userLimitFactor = userLimitFactor;
}
synchronized void setParentQueue(CSQueue parent) {
this.parent = parent;
}
@Override
public synchronized int getNumApplications() {
return getNumPendingApplications() + getNumActiveApplications();
@ -559,26 +562,28 @@ public class LeafQueue implements CSQueue {
}
@Override
public synchronized void reinitialize(CSQueue queue, Resource clusterResource)
public synchronized void reinitialize(
CSQueue newlyParsedQueue, Resource clusterResource)
throws IOException {
// Sanity check
if (!(queue instanceof LeafQueue) ||
!queue.getQueuePath().equals(getQueuePath())) {
if (!(newlyParsedQueue instanceof LeafQueue) ||
!newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
throw new IOException("Trying to reinitialize " + getQueuePath() +
" from " + queue.getQueuePath());
" from " + newlyParsedQueue.getQueuePath());
}
LeafQueue leafQueue = (LeafQueue)queue;
LeafQueue newlyParsedLeafQueue = (LeafQueue)newlyParsedQueue;
setupQueueConfigs(
clusterResource,
leafQueue.capacity, leafQueue.absoluteCapacity,
leafQueue.maximumCapacity, leafQueue.absoluteMaxCapacity,
leafQueue.userLimit, leafQueue.userLimitFactor,
leafQueue.maxApplications,
leafQueue.getMaxApplicationsPerUser(),
leafQueue.getMaximumActiveApplications(),
leafQueue.getMaximumActiveApplicationsPerUser(),
leafQueue.state, leafQueue.acls);
newlyParsedLeafQueue.capacity, newlyParsedLeafQueue.absoluteCapacity,
newlyParsedLeafQueue.maximumCapacity,
newlyParsedLeafQueue.absoluteMaxCapacity,
newlyParsedLeafQueue.userLimit, newlyParsedLeafQueue.userLimitFactor,
newlyParsedLeafQueue.maxApplications,
newlyParsedLeafQueue.getMaxApplicationsPerUser(),
newlyParsedLeafQueue.getMaximumActiveApplications(),
newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(),
newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls);
}
@Override
@ -591,7 +596,7 @@ public class LeafQueue implements CSQueue {
}
// Check if parent-queue allows access
return parent.hasAccess(acl, user);
return getParent().hasAccess(acl, user);
}
@Override
@ -649,10 +654,10 @@ public class LeafQueue implements CSQueue {
// Inform the parent queue
try {
parent.submitApplication(application, userName, queue);
getParent().submitApplication(application, userName, queue);
} catch (AccessControlException ace) {
LOG.info("Failed to submit application to parent-queue: " +
parent.getQueuePath(), ace);
getParent().getQueuePath(), ace);
removeApplication(application, user);
throw ace;
}
@ -708,7 +713,7 @@ public class LeafQueue implements CSQueue {
}
// Inform the parent queue
parent.finishApplication(application, queue);
getParent().finishApplication(application, queue);
}
public synchronized void removeApplication(FiCaSchedulerApp application, User user) {
@ -1183,34 +1188,32 @@ public class LeafQueue implements CSQueue {
return (rmContainer != null) ? rmContainer.getContainer() :
createContainer(application, node, capability, priority);
}
public Container createContainer(FiCaSchedulerApp application, FiCaSchedulerNode node,
Container createContainer(FiCaSchedulerApp application, FiCaSchedulerNode node,
Resource capability, Priority priority) {
NodeId nodeId = node.getRMNode().getNodeID();
ContainerId containerId = BuilderUtils.newContainerId(application
.getApplicationAttemptId(), application.getNewContainerId());
ContainerToken containerToken = null;
// If security is enabled, send the container-tokens too.
if (UserGroupInformation.isSecurityEnabled()) {
containerToken =
containerTokenSecretManager.createContainerToken(containerId, nodeId,
application.getUser(), capability);
if (containerToken == null) {
return null; // Try again later.
}
}
// Create the container
Container container = BuilderUtils.newContainer(containerId, nodeId,
node.getRMNode().getHttpAddress(), capability, priority,
containerToken);
null);
return container;
}
/**
* Create <code>ContainerToken</code>, only in secure-mode
*/
ContainerToken createContainerToken(
FiCaSchedulerApp application, Container container) {
return containerTokenSecretManager.createContainerToken(
container.getId(), container.getNodeId(),
application.getUser(), container.getResource());
}
private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
ResourceRequest request, NodeType type, RMContainer rmContainer) {
@ -1246,6 +1249,17 @@ public class LeafQueue implements CSQueue {
unreserve(application, priority, node, rmContainer);
}
// Create container tokens in secure-mode
if (UserGroupInformation.isSecurityEnabled()) {
ContainerToken containerToken =
createContainerToken(application, container);
if (containerToken == null) {
// Something went wrong...
return Resources.none();
}
container.setContainerToken(containerToken);
}
// Inform the application
RMContainer allocatedContainer =
application.allocate(type, node, priority, request, container);
@ -1351,7 +1365,7 @@ public class LeafQueue implements CSQueue {
}
// Inform the parent queue
parent.completedContainer(clusterResource, application,
getParent().completedContainer(clusterResource, application,
node, rmContainer, null, event);
}
}
@ -1361,7 +1375,7 @@ public class LeafQueue implements CSQueue {
// Update queue metrics
Resources.addTo(usedResources, resource);
CSQueueUtils.updateQueueStatistics(
this, parent, clusterResource, minimumAllocation);
this, getParent(), clusterResource, minimumAllocation);
++numContainers;
// Update user metrics
@ -1386,7 +1400,7 @@ public class LeafQueue implements CSQueue {
// Update queue metrics
Resources.subtractFrom(usedResources, resource);
CSQueueUtils.updateQueueStatistics(
this, parent, clusterResource, minimumAllocation);
this, getParent(), clusterResource, minimumAllocation);
--numContainers;
// Update user metrics
@ -1417,7 +1431,7 @@ public class LeafQueue implements CSQueue {
// Update metrics
CSQueueUtils.updateQueueStatistics(
this, parent, clusterResource, minimumAllocation);
this, getParent(), clusterResource, minimumAllocation);
// Update application properties
for (FiCaSchedulerApp application : activeApplications) {
@ -1488,7 +1502,7 @@ public class LeafQueue implements CSQueue {
synchronized (this) {
allocateResource(clusterResource, application, container.getResource());
}
parent.recoverContainer(clusterResource, application, container);
getParent().recoverContainer(clusterResource, application, container);
}

View File

@ -60,7 +60,7 @@ public class ParentQueue implements CSQueue {
private static final Log LOG = LogFactory.getLog(ParentQueue.class);
private final CSQueue parent;
private CSQueue parent;
private final String queueName;
private float capacity;
@ -216,10 +216,15 @@ public class ParentQueue implements CSQueue {
}
@Override
public CSQueue getParent() {
public synchronized CSQueue getParent() {
return parent;
}
@Override
public synchronized void setParent(CSQueue newParentQueue) {
this.parent = (ParentQueue)newParentQueue;
}
@Override
public String getQueueName() {
return queueName;
@ -357,37 +362,52 @@ public class ParentQueue implements CSQueue {
}
@Override
public synchronized void reinitialize(CSQueue queue, Resource clusterResource)
public synchronized void reinitialize(
CSQueue newlyParsedQueue, Resource clusterResource)
throws IOException {
// Sanity check
if (!(queue instanceof ParentQueue) ||
!queue.getQueuePath().equals(getQueuePath())) {
if (!(newlyParsedQueue instanceof ParentQueue) ||
!newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
throw new IOException("Trying to reinitialize " + getQueuePath() +
" from " + queue.getQueuePath());
" from " + newlyParsedQueue.getQueuePath());
}
ParentQueue parentQueue = (ParentQueue)queue;
ParentQueue newlyParsedParentQueue = (ParentQueue)newlyParsedQueue;
// Set new configs
setupQueueConfigs(clusterResource,
parentQueue.capacity, parentQueue.absoluteCapacity,
parentQueue.maximumCapacity, parentQueue.absoluteMaxCapacity,
parentQueue.state, parentQueue.acls);
newlyParsedParentQueue.capacity,
newlyParsedParentQueue.absoluteCapacity,
newlyParsedParentQueue.maximumCapacity,
newlyParsedParentQueue.absoluteMaxCapacity,
newlyParsedParentQueue.state,
newlyParsedParentQueue.acls);
// Re-configure existing child queues and add new ones
// The CS has already checked to ensure all existing child queues are present!
Map<String, CSQueue> currentChildQueues = getQueues(childQueues);
Map<String, CSQueue> newChildQueues = getQueues(parentQueue.childQueues);
Map<String, CSQueue> newChildQueues =
getQueues(newlyParsedParentQueue.childQueues);
for (Map.Entry<String, CSQueue> e : newChildQueues.entrySet()) {
String newChildQueueName = e.getKey();
CSQueue newChildQueue = e.getValue();
CSQueue childQueue = currentChildQueues.get(newChildQueueName);
if (childQueue != null){
// Check if the child-queue already exists
if (childQueue != null) {
// Re-init existing child queues
childQueue.reinitialize(newChildQueue, clusterResource);
LOG.info(getQueueName() + ": re-configured queue: " + childQueue);
} else {
// New child queue, do not re-init
// Set parent to 'this'
newChildQueue.setParent(this);
// Save in list of current child queues
currentChildQueues.put(newChildQueueName, newChildQueue);
LOG.info(getQueueName() + ": added new child queue: " + newChildQueue);
}
}

View File

@ -378,4 +378,43 @@ public class TestCapacityScheduler {
Assert.assertEquals(4 * GB, cs.getClusterResources().getMemory());
}
@Test
public void testRefreshQueuesWithNewQueue() throws Exception {
CapacityScheduler cs = new CapacityScheduler();
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
cs.setConf(new YarnConfiguration());
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new ClientToAMTokenSecretManagerInRM()));
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
// Add a new queue b4
String B4 = B + ".b4";
float B4_CAPACITY = 10;
B3_CAPACITY -= B4_CAPACITY;
try {
conf.setCapacity(A, 80f);
conf.setCapacity(B, 20f);
conf.setQueues(B, new String[] {"b1", "b2", "b3", "b4"});
conf.setCapacity(B1, B1_CAPACITY);
conf.setCapacity(B2, B2_CAPACITY);
conf.setCapacity(B3, B3_CAPACITY);
conf.setCapacity(B4, B4_CAPACITY);
cs.reinitialize(conf,null);
checkQueueCapacities(cs, 80f, 20f);
// Verify parent for B4
CSQueue rootQueue = cs.getRootQueue();
CSQueue queueB = findQueue(rootQueue, B);
CSQueue queueB4 = findQueue(queueB, B4);
assertEquals(queueB, queueB4.getParent());
} finally {
B3_CAPACITY += B4_CAPACITY;
}
}
}