diff --git a/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml b/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml index c71f8f4dbbd..20436abfee8 100644 --- a/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml +++ b/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml @@ -62,6 +62,13 @@ **/* + + hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf + etc/hadoop + + **/* + + ${basedir} /share/doc/hadoop/${hadoop.component} diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 84d573115f7..9c50f2c4ea1 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -375,11 +375,14 @@ Release 2.0.3-alpha - Unreleased (rkanter via tucu) HADOOP-8900. BuiltInGzipDecompressor throws IOException - stored gzip size - doesn't match decompressed size. (Slavik Krassovsky via suresh) + doesn't match decompressed size. (Andy Isaacson via suresh) HADOOP-8948. TestFileUtil.testGetDU fails on Windows due to incorrect assumption of line separator. (Chris Nauroth via suresh) + HADOOP-8951. RunJar to fail with user-comprehensible error + message if jar missing. (stevel via suresh) + Release 2.0.2-alpha - 2012-09-07 INCOMPATIBLE CHANGES @@ -1091,6 +1094,12 @@ Release 0.23.5 - UNRELEASED HADOOP-8906. paths with multiple globs are unreliable. (Daryn Sharp via jlowe) + HADOOP-8811. Compile hadoop native library in FreeBSD (Radim Kolar via + bobby) + + HADOOP-8962. RawLocalFileSystem.listStatus fails when a child filename + contains a colon (jlowe via bobby) + Release 0.23.4 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/CMakeLists.txt b/hadoop-common-project/hadoop-common/src/CMakeLists.txt index 68c63abae3b..c7f05e5c3bc 100644 --- a/hadoop-common-project/hadoop-common/src/CMakeLists.txt +++ b/hadoop-common-project/hadoop-common/src/CMakeLists.txt @@ -67,6 +67,9 @@ macro(set_find_shared_library_version LVERS) IF(${CMAKE_SYSTEM_NAME} MATCHES "Darwin") # Mac OS uses .dylib SET(CMAKE_FIND_LIBRARY_SUFFIXES ".${LVERS}.dylib") + ELSEIF(${CMAKE_SYSTEM_NAME} MATCHES "FreeBSD") + # FreeBSD has always .so installed. + SET(CMAKE_FIND_LIBRARY_SUFFIXES ".so") ELSEIF(${CMAKE_SYSTEM_NAME} MATCHES "Windows") # Windows doesn't support finding shared libraries by version. ELSE() @@ -95,8 +98,10 @@ GET_FILENAME_COMPONENT(HADOOP_ZLIB_LIBRARY ${ZLIB_LIBRARIES} NAME) INCLUDE(CheckFunctionExists) INCLUDE(CheckCSourceCompiles) +INCLUDE(CheckLibraryExists) CHECK_FUNCTION_EXISTS(sync_file_range HAVE_SYNC_FILE_RANGE) CHECK_FUNCTION_EXISTS(posix_fadvise HAVE_POSIX_FADVISE) +CHECK_LIBRARY_EXISTS(dl dlopen "" NEED_LINK_DL) SET(STORED_CMAKE_FIND_LIBRARY_SUFFIXES CMAKE_FIND_LIBRARY_SUFFIXES) set_find_shared_library_version("1") @@ -159,6 +164,9 @@ add_dual_library(hadoop ${D}/util/NativeCrc32.c ${D}/util/bulk_crc32.c ) +if (NEED_LINK_DL) + set(LIB_DL dl) +endif (NEED_LINK_DL) IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux") # @@ -171,7 +179,7 @@ IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux") ENDIF() target_link_dual_libraries(hadoop - dl + ${LIB_DL} ${JAVA_JVM_LIBRARY} ) SET(LIBHADOOP_VERSION "1.0.0") diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HardLink.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HardLink.java index eba1e0c6c8b..2ea115bbaa7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HardLink.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HardLink.java @@ -43,7 +43,8 @@ public class HardLink { OS_TYPE_UNIX, OS_TYPE_WINXP, OS_TYPE_SOLARIS, - OS_TYPE_MAC + OS_TYPE_MAC, + OS_TYPE_FREEBSD } public static OSType osType; @@ -63,7 +64,7 @@ public class HardLink { getHardLinkCommand = new HardLinkCGUnix(); //override getLinkCountCommand for the particular Unix variant //Linux is already set as the default - {"stat","-c%h", null} - if (osType == OSType.OS_TYPE_MAC) { + if (osType == OSType.OS_TYPE_MAC || osType == OSType.OS_TYPE_FREEBSD) { String[] linkCountCmdTemplate = {"/usr/bin/stat","-f%l", null}; HardLinkCGUnix.setLinkCountCmdTemplate(linkCountCmdTemplate); } else if (osType == OSType.OS_TYPE_SOLARIS) { @@ -95,6 +96,9 @@ public class HardLink { else if (osName.contains("Mac")) { return OSType.OS_TYPE_MAC; } + else if (osName.contains("FreeBSD")) { + return OSType.OS_TYPE_FREEBSD; + } else { return OSType.OS_TYPE_UNIX; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java index 267510d364d..4c089f1a299 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java @@ -350,7 +350,7 @@ public class RawLocalFileSystem extends FileSystem { new RawLocalFileStatus(localf, getDefaultBlockSize(f), this) }; } - String[] names = localf.list(); + File[] names = localf.listFiles(); if (names == null) { return null; } @@ -358,7 +358,7 @@ public class RawLocalFileSystem extends FileSystem { int j = 0; for (int i = 0; i < names.length; i++) { try { - results[j] = getFileStatus(new Path(f, names[i])); + results[j] = getFileStatus(new Path(names[i].getAbsolutePath())); j++; } catch (FileNotFoundException e) { // ignore the files not found since the dir list may have have changed diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java index 7211b5bd706..ec1c6d84c0e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java @@ -126,6 +126,10 @@ public class RunJar { int firstArg = 0; String fileName = args[firstArg++]; File file = new File(fileName); + if (!file.exists() || !file.isFile()) { + System.err.println("Not a valid JAR: " + file.getCanonicalPath()); + System.exit(-1); + } String mainClassName = null; JarFile jarFile; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringInterner.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringInterner.java new file mode 100644 index 00000000000..d087610a5d1 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringInterner.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; + +/** + * Provides equivalent behavior to String.intern() to optimize performance, + * whereby does not consume memory in the permanent generation. + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class StringInterner { + + /** + * Retains a strong reference to each string instance it has interned. + */ + private final static Interner strongInterner; + + /** + * Retains a weak reference to each string instance it has interned. + */ + private final static Interner weakInterner; + + + + static { + strongInterner = Interners.newStrongInterner(); + weakInterner = Interners.newWeakInterner(); + } + + /** + * Interns and returns a reference to the representative instance + * for any of a collection of string instances that are equal to each other. + * Retains strong reference to the instance, + * thus preventing it from being garbage-collected. + * + * @param sample string instance to be interned + * @return strong reference to interned string instance + */ + public static String strongIntern(String sample) { + return strongInterner.intern(sample); + } + + /** + * Interns and returns a reference to the representative instance + * for any of a collection of string instances that are equal to each other. + * Retains weak reference to the instance, + * and so does not prevent it from being garbage-collected. + * + * @param sample string instance to be interned + * @return weak reference to interned string instance + */ + public static String weakIntern(String sample) { + return weakInterner.intern(sample); + } + +} diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c index 139ddafecaa..4a91d0af954 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c @@ -254,7 +254,11 @@ Java_org_apache_hadoop_io_nativeio_NativeIO_posix_1fadvise( int err = 0; if ((err = posix_fadvise(fd, (off_t)offset, (off_t)len, flags))) { +#ifdef __FreeBSD__ + throw_ioe(env, errno); +#else throw_ioe(env, err); +#endif } #endif } @@ -310,6 +314,22 @@ Java_org_apache_hadoop_io_nativeio_NativeIO_sync_1file_1range( #endif } +#ifdef __FreeBSD__ +static int toFreeBSDFlags(int flags) +{ + int rc = flags & 03; + if ( flags & 0100 ) rc |= O_CREAT; + if ( flags & 0200 ) rc |= O_EXCL; + if ( flags & 0400 ) rc |= O_NOCTTY; + if ( flags & 01000 ) rc |= O_TRUNC; + if ( flags & 02000 ) rc |= O_APPEND; + if ( flags & 04000 ) rc |= O_NONBLOCK; + if ( flags &010000 ) rc |= O_SYNC; + if ( flags &020000 ) rc |= O_ASYNC; + return rc; +} +#endif + /* * public static native FileDescriptor open(String path, int flags, int mode); */ @@ -318,6 +338,9 @@ Java_org_apache_hadoop_io_nativeio_NativeIO_open( JNIEnv *env, jclass clazz, jstring j_path, jint flags, jint mode) { +#ifdef __FreeBSD__ + flags = toFreeBSDFlags(flags); +#endif jobject ret = NULL; const char *path = (*env)->GetStringUTFChars(env, j_path, NULL); @@ -399,7 +422,7 @@ err: * Determine how big a buffer we need for reentrant getpwuid_r and getgrnam_r */ ssize_t get_pw_buflen() { - size_t ret = 0; + long ret = 0; #ifdef _SC_GETPW_R_SIZE_MAX ret = sysconf(_SC_GETPW_R_SIZE_MAX); #endif diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/security/JniBasedUnixGroupsNetgroupMapping.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/security/JniBasedUnixGroupsNetgroupMapping.c index 39458f36177..1177d728221 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/security/JniBasedUnixGroupsNetgroupMapping.c +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/security/JniBasedUnixGroupsNetgroupMapping.c @@ -46,6 +46,7 @@ JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_security_JniBasedUnixGroupsNetgroupMapping_getUsersForNetgroupJNI (JNIEnv *env, jobject jobj, jstring jgroup) { UserList *userListHead = NULL; + UserList *current = NULL; int userListSize = 0; // pointers to free at the end @@ -72,8 +73,10 @@ Java_org_apache_hadoop_security_JniBasedUnixGroupsNetgroupMapping_getUsersForNet // was successful or not (as long as it was called we need to call // endnetgrent) setnetgrentCalledFlag = 1; +#ifndef __FreeBSD__ if(setnetgrent(cgroup) == 1) { - UserList *current = NULL; +#endif + current = NULL; // three pointers are for host, user, domain, we only care // about user now char *p[3]; @@ -87,7 +90,9 @@ Java_org_apache_hadoop_security_JniBasedUnixGroupsNetgroupMapping_getUsersForNet userListSize++; } } +#ifndef __FreeBSD__ } +#endif //-------------------------------------------------- // build return data (java array) @@ -101,7 +106,7 @@ Java_org_apache_hadoop_security_JniBasedUnixGroupsNetgroupMapping_getUsersForNet goto END; } - UserList * current = NULL; + current = NULL; // note that the loop iterates over list but also over array (i) int i = 0; diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/security/getGroup.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/security/getGroup.c index 2b558c54fe9..f19ec79e16b 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/security/getGroup.c +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/security/getGroup.c @@ -78,7 +78,7 @@ int getGroupIDList(const char *user, int *ngroups, gid_t **groups) { */ int getGroupDetails(gid_t group, char **grpBuf) { struct group * grp = NULL; - size_t currBufferSize = sysconf(_SC_GETGR_R_SIZE_MAX); + long currBufferSize = sysconf(_SC_GETGR_R_SIZE_MAX); if (currBufferSize < 1024) { currBufferSize = 1024; } @@ -123,7 +123,7 @@ int getGroupDetails(gid_t group, char **grpBuf) { */ int getPW(const char *user, char **pwbuf) { struct passwd *pwbufp = NULL; - size_t currBufferSize = sysconf(_SC_GETPW_R_SIZE_MAX); + long currBufferSize = sysconf(_SC_GETPW_R_SIZE_MAX); if (currBufferSize < 1024) { currBufferSize = 1024; } diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/bulk_crc32.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/bulk_crc32.c index 7009bf1f5cc..74f79dd35dd 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/bulk_crc32.c +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/bulk_crc32.c @@ -32,7 +32,9 @@ #include "bulk_crc32.h" #include "gcc_optimizations.h" +#ifndef __FreeBSD__ #define USE_PIPELINED +#endif #define CRC_INITIAL_VAL 0xffffffff @@ -260,7 +262,7 @@ static uint32_t crc32_zlib_sb8( // Begin code for SSE4.2 specific hardware support of CRC32C /////////////////////////////////////////////////////////////////////////// -#if (defined(__amd64__) || defined(__i386)) && defined(__GNUC__) +#if (defined(__amd64__) || defined(__i386)) && defined(__GNUC__) && !defined(__FreeBSD__) # define SSE42_FEATURE_BIT (1 << 20) # define CPUID_FEATURES 1 /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHardLink.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHardLink.java index ff1d099438b..3b769472466 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHardLink.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHardLink.java @@ -364,8 +364,12 @@ public class TestHardLink { callCount = createHardLinkMult(src, fileNames, tgt_mult, maxLength); //check the request was completed in exactly two "chunks" assertEquals(2, callCount); + String[] tgt_multNames = tgt_mult.list(); + //sort directory listings before comparsion + Arrays.sort(fileNames); + Arrays.sort(tgt_multNames); //and check the results were as expected in the dir tree - assertTrue(Arrays.deepEquals(fileNames, tgt_mult.list())); + assertArrayEquals(fileNames, tgt_multNames); //Test the case where maxlength is too small even for one filename. //It should go ahead and try the single files. @@ -382,8 +386,12 @@ public class TestHardLink { maxLength); //should go ahead with each of the three single file names assertEquals(3, callCount); - //check the results were as expected in the dir tree - assertTrue(Arrays.deepEquals(fileNames, tgt_mult.list())); + tgt_multNames = tgt_mult.list(); + //sort directory listings before comparsion + Arrays.sort(fileNames); + Arrays.sort(tgt_multNames); + //and check the results were as expected in the dir tree + assertArrayEquals(fileNames, tgt_multNames); } /* diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java index e411314b85e..eb3d33df377 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java @@ -249,6 +249,7 @@ public class TestLocalFileSystem { assertEquals(1, fileSchemeCount); } + @Test public void testHasFileDescriptor() throws IOException { Configuration conf = new Configuration(); LocalFileSystem fs = FileSystem.getLocal(conf); @@ -258,4 +259,17 @@ public class TestLocalFileSystem { new RawLocalFileSystem().new LocalFSFileInputStream(path), 1024); assertNotNull(bis.getFileDescriptor()); } + + @Test + public void testListStatusWithColons() throws IOException { + Configuration conf = new Configuration(); + LocalFileSystem fs = FileSystem.getLocal(conf); + File colonFile = new File(TEST_ROOT_DIR, "foo:bar"); + colonFile.mkdirs(); + colonFile.createNewFile(); + FileStatus[] stats = fs.listStatus(new Path(TEST_ROOT_DIR)); + assertEquals("Unexpected number of stats", 1, stats.length); + assertEquals("Bad path from stat", colonFile.getAbsolutePath(), + stats[0].getPath().toUri().getPath()); + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java index 6a911ed804d..f947e02efb4 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java @@ -224,7 +224,10 @@ public class TestNativeIO { // we should just skip the unit test on machines where we don't // have fadvise support assumeTrue(false); - } finally { + } catch (NativeIOException nioe) { + // ignore this error as FreeBSD returns EBADF even if length is zero + } + finally { fis.close(); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestStringInterner.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestStringInterner.java new file mode 100644 index 00000000000..e43da49d75d --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestStringInterner.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import static org.junit.Assert.*; +import static org.apache.hadoop.util.StringInterner.*; + +import org.junit.Test; + +/** + * + * Tests string interning {@link StringInterner} + */ +public class TestStringInterner { + + /** + * Test different references are returned for any of string + * instances that are equal to each other but not interned. + */ + @Test + public void testNoIntern() { + String literalABC = "ABC"; + String substringABC = "ABCDE".substring(0,3); + String heapABC = new String("ABC"); + assertNotSame(literalABC, substringABC); + assertNotSame(literalABC, heapABC); + assertNotSame(substringABC, heapABC); + } + + + /** + * Test the same strong reference is returned for any + * of string instances that are equal to each other. + */ + @Test + public void testStrongIntern() { + String strongInternLiteralABC = strongIntern("ABC"); + String strongInternSubstringABC = strongIntern("ABCDE".substring(0,3)); + String strongInternHeapABC = strongIntern(new String("ABC")); + assertSame(strongInternLiteralABC, strongInternSubstringABC); + assertSame(strongInternLiteralABC, strongInternHeapABC); + assertSame(strongInternSubstringABC, strongInternHeapABC); + } + + + /** + * Test the same weak reference is returned for any + * of string instances that are equal to each other. + */ + @Test + public void testWeakIntern() { + String weakInternLiteralABC = weakIntern("ABC"); + String weakInternSubstringABC = weakIntern("ABCDE".substring(0,3)); + String weakInternHeapABC = weakIntern(new String("ABC")); + assertSame(weakInternLiteralABC, weakInternSubstringABC); + assertSame(weakInternLiteralABC, weakInternHeapABC); + assertSame(weakInternSubstringABC, weakInternHeapABC); + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 0d19eebdf6c..5b20cd7cbb3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -230,6 +230,9 @@ Trunk (Unreleased) HADOOP-8158. Interrupting hadoop fs -put from the command line causes a LeaseExpiredException. (daryn via harsh) + HDFS-2434. TestNameNodeMetrics.testCorruptBlock fails intermittently. + (Jing Zhao via suresh) + BREAKDOWN OF HDFS-3077 SUBTASKS HDFS-3077. Quorum-based protocol for reading and writing edit logs. @@ -408,6 +411,8 @@ Release 2.0.3-alpha - Unreleased HDFS-4088. Remove "throws QuotaExceededException" from an INodeDirectoryWithQuota constructor. (szetszwo) + HDFS-4099. Clean up replication code and add more javadoc. (szetszwo) + OPTIMIZATIONS BUG FIXES @@ -480,7 +485,8 @@ Release 2.0.3-alpha - Unreleased HDFS-4072. On file deletion remove corresponding blocks pending replications. (Jing Zhao via suresh) - HDFS-4022. Replication not happening for appended block. (Vinay via umamahesh) + HDFS-4022. Replication not happening for appended block. + (Vinay via umamahesh) Release 2.0.2-alpha - 2012-09-07 @@ -1879,6 +1885,9 @@ Release 0.23.5 - UNRELEASED HDFS-3224. Bug in check for DN re-registration with different storage ID (jlowe) + HDFS-4090. getFileChecksum() result incompatible when called against + zero-byte files. (Kihwal Lee via daryn) + Release 0.23.4 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 47033bca5f0..5d8e957de79 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -1769,6 +1769,13 @@ public class DFSClient implements java.io.Closeable { return new MD5MD5CRC32CastagnoliFileChecksum(bytesPerCRC, crcPerBlock, fileMD5); default: + // If there is no block allocated for the file, + // return one with the magic entry that matches what previous + // hdfs versions return. + if (locatedblocks.size() == 0) { + return new MD5MD5CRC32GzipFileChecksum(0, 0, fileMD5); + } + // we should never get here since the validity was checked // when getCrcType() was called above. return null; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 81ec0b5e2e1..e7aff0d61e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; +import static org.apache.hadoop.util.ExitUtil.terminate; + import java.io.IOException; import java.io.PrintWriter; import java.util.ArrayList; @@ -49,14 +51,11 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; -import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo; - -import static org.apache.hadoop.util.ExitUtil.terminate; - import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.namenode.FSClusterStats; @@ -2833,28 +2832,32 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block } } - public void checkReplication(Block block, short numExpectedReplicas) { - // filter out containingNodes that are marked for decommission. - NumberReplicas number = countNodes(block); - if (isNeededReplication(block, numExpectedReplicas, number.liveReplicas())) { - neededReplications.add(block, - number.liveReplicas(), - number.decommissionedReplicas(), - numExpectedReplicas); - return; - } - if (number.liveReplicas() > numExpectedReplicas) { - processOverReplicatedBlock(block, numExpectedReplicas, null, null); + /** + * Check replication of the blocks in the collection. + * If any block is needed replication, insert it into the replication queue. + * Otherwise, if the block is more than the expected replication factor, + * process it as an over replicated block. + */ + public void checkReplication(BlockCollection bc) { + final short expected = bc.getBlockReplication(); + for (Block block : bc.getBlocks()) { + final NumberReplicas n = countNodes(block); + if (isNeededReplication(block, expected, n.liveReplicas())) { + neededReplications.add(block, n.liveReplicas(), + n.decommissionedReplicas(), expected); + } else if (n.liveReplicas() > expected) { + processOverReplicatedBlock(block, expected, null, null); + } } } - /* get replication factor of a block */ + /** + * @return 0 if the block is not found; + * otherwise, return the replication factor of the block. + */ private int getReplication(Block block) { - BlockCollection bc = blocksMap.getBlockCollection(block); - if (bc == null) { // block does not belong to any file - return 0; - } - return bc.getBlockReplication(); + final BlockCollection bc = blocksMap.getBlockCollection(block); + return bc == null? 0: bc.getBlockReplication(); } @@ -2929,12 +2932,12 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block return enoughRacks; } - boolean isNeededReplication(Block b, int expectedReplication, int curReplicas) { - if ((curReplicas >= expectedReplication) && (blockHasEnoughRacks(b))) { - return false; - } else { - return true; - } + /** + * A block needs replication if the number of replicas is less than expected + * or if it does not have enough racks. + */ + private boolean isNeededReplication(Block b, int expected, int current) { + return current < expected || !blockHasEnoughRacks(b); } public long getMissingBlocksCount() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 52e47bf071a..91ef3a8e2fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -2433,21 +2433,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats, return true; } - /** - * Check all blocks of a file. If any blocks are lower than their intended - * replication factor, then insert them into neededReplication and if - * the blocks are more than the intended replication factor then insert - * them into invalidateBlocks. - */ - private void checkReplicationFactor(INodeFile file) { - short numExpectedReplicas = file.getBlockReplication(); - Block[] pendingBlocks = file.getBlocks(); - int nrBlocks = pendingBlocks.length; - for (int i = 0; i < nrBlocks; i++) { - blockManager.checkReplication(pendingBlocks[i], numExpectedReplicas); - } - } - /** * Allocate a block at the given pending filename * @@ -3180,7 +3165,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, // close file and persist block allocations for this file dir.closeFile(src, newFile); - checkReplicationFactor(newFile); + blockManager.checkReplication(newFile); } void commitBlockSynchronization(ExtendedBlock lastblock, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index 9d2edc65832..0aec4960554 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -542,6 +542,21 @@ public class TestDistributedFileSystem { final FileChecksum webhdfs_qfoocs = webhdfs.getFileChecksum(webhdfsqualified); System.out.println("webhdfs_qfoocs=" + webhdfs_qfoocs); + //create a zero byte file + final Path zeroByteFile = new Path(dir, "zeroByteFile" + n); + { + final FSDataOutputStream out = hdfs.create(zeroByteFile, false, buffer_size, + (short)2, block_size); + out.close(); + } + + // verify the magic val for zero byte files + { + final FileChecksum zeroChecksum = hdfs.getFileChecksum(zeroByteFile); + assertEquals(zeroChecksum.toString(), + "MD5-of-0MD5-of-0CRC32:70bc8f4b72a86921468bf8e8441dce51"); + } + //write another file final Path bar = new Path(dir, "bar" + n); { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java index e213379151c..d4184e66cb7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java @@ -205,6 +205,12 @@ public class TestNameNodeMetrics { final Path file = getTestPath("testCorruptBlock"); createFile(file, 100, (short)2); + // Disable the heartbeats, so that no corrupted replica + // can be fixed + for (DataNode dn : cluster.getDataNodes()) { + DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true); + } + // Corrupt first replica of the block LocatedBlock block = NameNodeAdapter.getBlockLocations( cluster.getNameNode(), file.toString(), 0, 1).get(0); @@ -215,12 +221,23 @@ public class TestNameNodeMetrics { } finally { cluster.getNamesystem().writeUnlock(); } - Thread.sleep(1000); // Wait for block to be marked corrupt + BlockManagerTestUtil.getComputedDatanodeWork(bm); + BlockManagerTestUtil.updateState(bm); MetricsRecordBuilder rb = getMetrics(NS_METRICS); assertGauge("CorruptBlocks", 1L, rb); assertGauge("PendingReplicationBlocks", 1L, rb); assertGauge("ScheduledReplicationBlocks", 1L, rb); + fs.delete(file, true); + // During the file deletion, both BlockManager#corruptReplicas and + // BlockManager#pendingReplications will be updated, i.e., the records + // for the blocks of the deleted file will be removed from both + // corruptReplicas and pendingReplications. The corresponding + // metrics (CorruptBlocks and PendingReplicationBlocks) will only be updated + // when BlockManager#computeDatanodeWork is run where the + // BlockManager#udpateState is called. And in + // BlockManager#computeDatanodeWork the metric ScheduledReplicationBlocks + // will also be updated. rb = waitForDnMetricValue(NS_METRICS, "CorruptBlocks", 0L); assertGauge("PendingReplicationBlocks", 0L, rb); assertGauge("ScheduledReplicationBlocks", 0L, rb); diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 56641fe122b..9b7828d6569 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -189,6 +189,9 @@ Release 2.0.3-alpha - Unreleased MAPREDUCE-4736. Remove obsolete option [-rootDir] from TestDFSIO. (Brandon Li via suresh) + MAPREDUCE-4637. Handle TaskAttempt diagnostic updates while in the NEW and + UNASSIGNED states. (Mayank Bansal via sseth) + Release 2.0.2-alpha - 2012-09-07 INCOMPATIBLE CHANGES @@ -599,6 +602,14 @@ Release 0.23.5 - UNRELEASED MAPREDUCE-4733. Reducer can fail to make progress during shuffle if too many reducers complete consecutively. (Jason Lowe via vinodkv) + MAPREDUCE-4740. only .jars can be added to the Distributed Cache + classpath. (Robert Joseph Evans via jlowe) + + MAPREDUCE-4229. Intern counter names in the JT (Miomir Boljanovic and bobby via daryn) + + MAPREDUCE-4741. WARN and ERROR messages logged during normal AM shutdown. + (Vinod Kumar Vavilapalli via jlowe) + Release 0.23.4 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 23f436f46cd..1191f8d789b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -107,6 +107,8 @@ import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.util.ConverterUtils; +import com.google.common.annotations.VisibleForTesting; + /** * The Map-Reduce Application Master. * The state machine is encapsulated in the implementation of Job interface. @@ -398,52 +400,65 @@ public class MRAppMaster extends CompositeService { protected void sysexit() { System.exit(0); } - + + @VisibleForTesting + public void shutDownJob() { + // job has finished + // this is the only job, so shut down the Appmaster + // note in a workflow scenario, this may lead to creation of a new + // job (FIXME?) + // Send job-end notification + if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) { + try { + LOG.info("Job end notification started for jobID : " + + job.getReport().getJobId()); + JobEndNotifier notifier = new JobEndNotifier(); + notifier.setConf(getConfig()); + notifier.notify(job.getReport()); + } catch (InterruptedException ie) { + LOG.warn("Job end notification interrupted for jobID : " + + job.getReport().getJobId(), ie); + } + } + + // TODO:currently just wait for some time so clients can know the + // final states. Will be removed once RM come on. + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + try { + //We are finishing cleanly so this is the last retry + isLastAMRetry = true; + // Stop all services + // This will also send the final report to the ResourceManager + LOG.info("Calling stop for all the services"); + MRAppMaster.this.stop(); + + } catch (Throwable t) { + LOG.warn("Graceful stop failed ", t); + } + + //Bring the process down by force. + //Not needed after HADOOP-7140 + LOG.info("Exiting MR AppMaster..GoodBye!"); + sysexit(); + } + private class JobFinishEventHandler implements EventHandler { @Override public void handle(JobFinishEvent event) { - // job has finished - // this is the only job, so shut down the Appmaster - // note in a workflow scenario, this may lead to creation of a new - // job (FIXME?) - // Send job-end notification - if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) { - try { - LOG.info("Job end notification started for jobID : " - + job.getReport().getJobId()); - JobEndNotifier notifier = new JobEndNotifier(); - notifier.setConf(getConfig()); - notifier.notify(job.getReport()); - } catch (InterruptedException ie) { - LOG.warn("Job end notification interrupted for jobID : " - + job.getReport().getJobId(), ie); + // Create a new thread to shutdown the AM. We should not do it in-line + // to avoid blocking the dispatcher itself. + new Thread() { + + @Override + public void run() { + shutDownJob(); } - } - - // TODO:currently just wait for some time so clients can know the - // final states. Will be removed once RM come on. - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - try { - //We are finishing cleanly so this is the last retry - isLastAMRetry = true; - // Stop all services - // This will also send the final report to the ResourceManager - LOG.info("Calling stop for all the services"); - stop(); - - } catch (Throwable t) { - LOG.warn("Graceful stop failed ", t); - } - - //Bring the process down by force. - //Not needed after HADOOP-7140 - LOG.info("Exiting MR AppMaster..GoodBye!"); - sysexit(); + }.start(); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 687edc379cf..a39a1bc6b94 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -200,6 +200,10 @@ public abstract class TaskAttemptImpl implements TaskAttemptEventType.TA_KILL, new KilledTransition()) .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_FAILMSG, new FailedTransition()) + .addTransition(TaskAttemptStateInternal.NEW, + TaskAttemptStateInternal.NEW, + TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, + DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) // Transitions from the UNASSIGNED state. .addTransition(TaskAttemptStateInternal.UNASSIGNED, @@ -211,6 +215,10 @@ public abstract class TaskAttemptImpl implements .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_FAILMSG, new DeallocateContainerTransition( TaskAttemptStateInternal.FAILED, true)) + .addTransition(TaskAttemptStateInternal.UNASSIGNED, + TaskAttemptStateInternal.UNASSIGNED, + TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, + DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) // Transitions from the ASSIGNED state. .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.RUNNING, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java index 3144ab179c9..fa97d692ee3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java @@ -30,6 +30,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -81,6 +82,7 @@ public class ContainerLauncherImpl extends AbstractService implements protected BlockingQueue eventQueue = new LinkedBlockingQueue(); YarnRPC rpc; + private final AtomicBoolean stopped; private Container getContainer(ContainerLauncherEvent event) { ContainerId id = event.getContainerID(); @@ -237,6 +239,7 @@ public class ContainerLauncherImpl extends AbstractService implements public ContainerLauncherImpl(AppContext context) { super(ContainerLauncherImpl.class.getName()); this.context = context; + this.stopped = new AtomicBoolean(false); } @Override @@ -271,11 +274,13 @@ public class ContainerLauncherImpl extends AbstractService implements @Override public void run() { ContainerLauncherEvent event = null; - while (!Thread.currentThread().isInterrupted()) { + while (!stopped.get() && !Thread.currentThread().isInterrupted()) { try { event = eventQueue.take(); } catch (InterruptedException e) { - LOG.error("Returning, interrupted : " + e); + if (!stopped.get()) { + LOG.error("Returning, interrupted : " + e); + } return; } int poolSize = launcherPool.getCorePoolSize(); @@ -324,6 +329,10 @@ public class ContainerLauncherImpl extends AbstractService implements } public void stop() { + if (stopped.getAndSet(true)) { + // return if already stopped + return; + } // shutdown any containers that might be left running shutdownAllContainers(); eventHandlingThread.interrupt(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index 63e92467ea9..9f594c09e86 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -67,7 +67,7 @@ public abstract class RMCommunicator extends AbstractService { private int rmPollInterval;//millis protected ApplicationId applicationId; protected ApplicationAttemptId applicationAttemptId; - private AtomicBoolean stopped; + private final AtomicBoolean stopped; protected Thread allocatorThread; @SuppressWarnings("rawtypes") protected EventHandler eventHandler; @@ -239,7 +239,9 @@ public abstract class RMCommunicator extends AbstractService { // TODO: for other exceptions } } catch (InterruptedException e) { - LOG.warn("Allocated thread interrupted. Returning."); + if (!stopped.get()) { + LOG.warn("Allocated thread interrupted. Returning."); + } return; } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index f8ebfcfc6d2..fd8fa960762 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -32,6 +32,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -84,7 +85,7 @@ public class RMContainerAllocator extends RMContainerRequestor private static final Priority PRIORITY_MAP; private Thread eventHandlingThread; - private volatile boolean stopEventHandling; + private final AtomicBoolean stopped; static { PRIORITY_FAST_FAIL_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class); @@ -145,6 +146,7 @@ public class RMContainerAllocator extends RMContainerRequestor public RMContainerAllocator(ClientService clientService, AppContext context) { super(clientService, context); + this.stopped = new AtomicBoolean(false); } @Override @@ -176,11 +178,13 @@ public class RMContainerAllocator extends RMContainerRequestor ContainerAllocatorEvent event; - while (!stopEventHandling && !Thread.currentThread().isInterrupted()) { + while (!stopped.get() && !Thread.currentThread().isInterrupted()) { try { event = RMContainerAllocator.this.eventQueue.take(); } catch (InterruptedException e) { - LOG.error("Returning, interrupted : " + e); + if (!stopped.get()) { + LOG.error("Returning, interrupted : " + e); + } return; } @@ -234,7 +238,10 @@ public class RMContainerAllocator extends RMContainerRequestor @Override public void stop() { - this.stopEventHandling = true; + if (stopped.getAndSet(true)) { + // return if already stopped + return; + } eventHandlingThread.interrupt(); super.stop(); LOG.info("Final Stats: " + getStat()); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanerImpl.java index 8ee1d6e2eae..6c0418ebfdd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanerImpl.java @@ -23,6 +23,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -43,10 +44,12 @@ public class TaskCleanerImpl extends AbstractService implements TaskCleaner { private Thread eventHandlingThread; private BlockingQueue eventQueue = new LinkedBlockingQueue(); + private final AtomicBoolean stopped; public TaskCleanerImpl(AppContext context) { super("TaskCleaner"); this.context = context; + this.stopped = new AtomicBoolean(false); } public void start() { @@ -59,11 +62,13 @@ public class TaskCleanerImpl extends AbstractService implements TaskCleaner { @Override public void run() { TaskCleanupEvent event = null; - while (!Thread.currentThread().isInterrupted()) { + while (!stopped.get() && !Thread.currentThread().isInterrupted()) { try { event = eventQueue.take(); } catch (InterruptedException e) { - LOG.error("Returning, interrupted : " + e); + if (!stopped.get()) { + LOG.error("Returning, interrupted : " + e); + } return; } // the events from the queue are handled in parallel @@ -77,6 +82,10 @@ public class TaskCleanerImpl extends AbstractService implements TaskCleaner { } public void stop() { + if (stopped.getAndSet(true)) { + // return if already stopped + return; + } eventHandlingThread.interrupt(); launcherPool.shutdown(); super.stop(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java index 3dd6c33edef..eee29a702d7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java @@ -21,17 +21,15 @@ package org.apache.hadoop.mapreduce.v2.app; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.times; import java.io.IOException; import junit.framework.Assert; import junit.framework.TestCase; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -49,7 +47,6 @@ import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.service.AbstractService; @@ -68,7 +65,6 @@ import org.junit.Test; private Path stagingJobPath = new Path(stagingJobDir); private final static RecordFactory recordFactory = RecordFactoryProvider. getRecordFactory(null); - private static final Log LOG = LogFactory.getLog(TestStagingCleanup.class); @Test public void testDeletionofStaging() throws IOException { @@ -86,9 +82,7 @@ import org.junit.Test; jobid.setAppId(appId); MRAppMaster appMaster = new TestMRApp(attemptId); appMaster.init(conf); - EventHandler handler = - appMaster.createJobFinishEventHandler(); - handler.handle(new JobFinishEvent(jobid)); + appMaster.shutDownJob(); verify(fs).delete(stagingJobPath, true); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index 88e32b337ca..0a381facaf2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -546,6 +546,105 @@ public class TestTaskAttempt{ eventHandler.internalError); } + @Test + public void testAppDiognosticEventOnUnassignedTask() throws Exception { + ApplicationId appId = BuilderUtils.newApplicationId(1, 2); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( + appId, 0); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + Path jobFile = mock(Path.class); + + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn( + new InetSocketAddress("localhost", 0)); + + JobConf jobConf = new JobConf(); + jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + jobConf.setBoolean("fs.file.impl.disable.cache", true); + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); + jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); + + TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); + when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" }); + + AppContext appCtx = mock(AppContext.class); + ClusterInfo clusterInfo = mock(ClusterInfo.class); + Resource resource = mock(Resource.class); + when(appCtx.getClusterInfo()).thenReturn(clusterInfo); + when(clusterInfo.getMinContainerCapability()).thenReturn(resource); + when(resource.getMemory()).thenReturn(1024); + + TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, + jobFile, 1, splits, jobConf, taListener, mock(OutputCommitter.class), + mock(Token.class), new Credentials(), new SystemClock(), appCtx); + + NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0); + ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_SCHEDULE)); + taImpl.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptId, + "Task got killed")); + assertFalse( + "InternalError occurred trying to handle TA_DIAGNOSTICS_UPDATE on assigned task", + eventHandler.internalError); + } + + @Test + public void testAppDiognosticEventOnNewTask() throws Exception { + ApplicationId appId = BuilderUtils.newApplicationId(1, 2); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( + appId, 0); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + Path jobFile = mock(Path.class); + + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn( + new InetSocketAddress("localhost", 0)); + + JobConf jobConf = new JobConf(); + jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + jobConf.setBoolean("fs.file.impl.disable.cache", true); + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); + jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); + + TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); + when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" }); + + AppContext appCtx = mock(AppContext.class); + ClusterInfo clusterInfo = mock(ClusterInfo.class); + Resource resource = mock(Resource.class); + when(appCtx.getClusterInfo()).thenReturn(clusterInfo); + when(clusterInfo.getMinContainerCapability()).thenReturn(resource); + when(resource.getMemory()).thenReturn(1024); + + TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, + jobFile, 1, splits, jobConf, taListener, mock(OutputCommitter.class), + mock(Token.class), new Credentials(), new SystemClock(), appCtx); + + NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0); + ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + taImpl.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptId, + "Task got killed")); + assertFalse( + "InternalError occurred trying to handle TA_DIAGNOSTICS_UPDATE on assigned task", + eventHandler.internalError); + } + + public static class MockEventHandler implements EventHandler { public boolean internalError; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java index 169ba4b4c0f..596802853f3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java @@ -191,6 +191,7 @@ public class MRApps extends Apps { // TODO: Remove duplicates. } + @SuppressWarnings("deprecation") public static void setClasspath(Map environment, Configuration conf) throws IOException { boolean userClassesTakesPrecedence = @@ -218,11 +219,66 @@ public class MRApps extends Apps { environment, Environment.CLASSPATH.name(), Environment.PWD.$() + Path.SEPARATOR + "*"); + // a * in the classpath will only find a .jar, so we need to filter out + // all .jars and add everything else + addToClasspathIfNotJar(DistributedCache.getFileClassPaths(conf), + DistributedCache.getCacheFiles(conf), + conf, + environment); + addToClasspathIfNotJar(DistributedCache.getArchiveClassPaths(conf), + DistributedCache.getCacheArchives(conf), + conf, + environment); if (userClassesTakesPrecedence) { MRApps.setMRFrameworkClasspath(environment, conf); } } + /** + * Add the paths to the classpath if they are not jars + * @param paths the paths to add to the classpath + * @param withLinks the corresponding paths that may have a link name in them + * @param conf used to resolve the paths + * @param environment the environment to update CLASSPATH in + * @throws IOException if there is an error resolving any of the paths. + */ + private static void addToClasspathIfNotJar(Path[] paths, + URI[] withLinks, Configuration conf, + Map environment) throws IOException { + if (paths != null) { + HashMap linkLookup = new HashMap(); + if (withLinks != null) { + for (URI u: withLinks) { + Path p = new Path(u); + FileSystem remoteFS = p.getFileSystem(conf); + p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory())); + String name = (null == u.getFragment()) + ? p.getName() : u.getFragment(); + if (!name.toLowerCase().endsWith(".jar")) { + linkLookup.put(p, name); + } + } + } + + for (Path p : paths) { + FileSystem remoteFS = p.getFileSystem(conf); + p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory())); + String name = linkLookup.get(p); + if (name == null) { + name = p.getName(); + } + if(!name.toLowerCase().endsWith(".jar")) { + Apps.addToEnvironment( + environment, + Environment.CLASSPATH.name(), + Environment.PWD.$() + Path.SEPARATOR + name); + } + } + } + } + private static final String STAGING_CONSTANT = ".staging"; public static Path getStagingAreaDir(Configuration conf, String user) { return new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR, @@ -261,8 +317,7 @@ public class MRApps extends Apps { DistributedCache.getCacheArchives(conf), parseTimeStamps(DistributedCache.getArchiveTimestamps(conf)), getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES), - DistributedCache.getArchiveVisibilities(conf), - DistributedCache.getArchiveClassPaths(conf)); + DistributedCache.getArchiveVisibilities(conf)); // Cache files parseDistributedCacheArtifacts(conf, @@ -271,8 +326,7 @@ public class MRApps extends Apps { DistributedCache.getCacheFiles(conf), parseTimeStamps(DistributedCache.getFileTimestamps(conf)), getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES), - DistributedCache.getFileVisibilities(conf), - DistributedCache.getFileClassPaths(conf)); + DistributedCache.getFileVisibilities(conf)); } private static String getResourceDescription(LocalResourceType type) { @@ -289,8 +343,8 @@ public class MRApps extends Apps { Configuration conf, Map localResources, LocalResourceType type, - URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[], - Path[] pathsToPutOnClasspath) throws IOException { + URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[]) + throws IOException { if (uris != null) { // Sanity check @@ -304,15 +358,6 @@ public class MRApps extends Apps { ); } - Map classPaths = new HashMap(); - if (pathsToPutOnClasspath != null) { - for (Path p : pathsToPutOnClasspath) { - FileSystem remoteFS = p.getFileSystem(conf); - p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), - remoteFS.getWorkingDirectory())); - classPaths.put(p.toUri().getPath().toString(), p); - } - } for (int i = 0; i < uris.length; ++i) { URI u = uris[i]; Path p = new Path(u); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java index dbd3538602c..5cf515bb25e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java @@ -18,6 +18,8 @@ package org.apache.hadoop.mapreduce.v2.util; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; import java.util.HashMap; @@ -42,12 +44,36 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; - +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; import static org.junit.Assert.*; import static org.mockito.Mockito.*; public class TestMRApps { + private static File testWorkDir = null; + + @BeforeClass + public static void setupTestDirs() throws IOException { + testWorkDir = new File("target", TestMRApps.class.getCanonicalName()); + delete(testWorkDir); + testWorkDir.mkdirs(); + testWorkDir = testWorkDir.getAbsoluteFile(); + } + + @AfterClass + public static void cleanupTestDirs() throws IOException { + if (testWorkDir != null) { + delete(testWorkDir); + } + } + + private static void delete(File dir) throws IOException { + Path p = new Path("file://"+dir.getAbsolutePath()); + Configuration conf = new Configuration(); + FileSystem fs = p.getFileSystem(conf); + fs.delete(p, true); + } @Test public void testJobIDtoString() { JobId jid = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobId.class); @@ -154,6 +180,28 @@ public class TestMRApps { } assertTrue(environment.get("CLASSPATH").contains(mrAppClasspath)); } + + @Test public void testSetClasspathWithArchives () throws IOException { + File testTGZ = new File(testWorkDir, "test.tgz"); + FileOutputStream out = new FileOutputStream(testTGZ); + out.write(0); + out.close(); + Job job = Job.getInstance(); + Configuration conf = job.getConfiguration(); + conf.set(MRJobConfig.CLASSPATH_ARCHIVES, "file://" + + testTGZ.getAbsolutePath()); + conf.set(MRJobConfig.CACHE_ARCHIVES, "file://" + + testTGZ.getAbsolutePath() + "#testTGZ"); + Map environment = new HashMap(); + MRApps.setClasspath(environment, conf); + assertTrue(environment.get("CLASSPATH").startsWith("$PWD:")); + String confClasspath = job.getConfiguration().get(YarnConfiguration.YARN_APPLICATION_CLASSPATH); + if (confClasspath != null) { + confClasspath = confClasspath.replaceAll(",\\s*", ":").trim(); + } + assertTrue(environment.get("CLASSPATH").contains(confClasspath)); + assertTrue(environment.get("CLASSPATH").contains("testTGZ")); + } @Test public void testSetClasspathWithUserPrecendence() { Configuration conf = new Configuration(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java index 5d74b802189..68eac638eff 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.CounterGroup; import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.util.StringInterner; import org.apache.avro.Schema; import org.apache.avro.io.Decoder; @@ -170,9 +171,11 @@ public class EventReader implements Closeable { Counters result = new Counters(); for (JhCounterGroup g : counters.groups) { CounterGroup group = - result.addGroup(g.name.toString(), g.displayName.toString()); + result.addGroup(StringInterner.weakIntern(g.name.toString()), + StringInterner.weakIntern(g.displayName.toString())); for (JhCounter c : g.counts) { - group.addCounter(c.name.toString(), c.displayName.toString(), c.value); + group.addCounter(StringInterner.weakIntern(c.name.toString()), + StringInterner.weakIntern(c.displayName.toString()), c.value); } } return result; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java index 48c004b23b7..aa75a8e6252 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java @@ -42,6 +42,7 @@ import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapred.TaskStatus; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.util.StringInterner; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -226,10 +227,10 @@ public class JobHistoryParser { TaskAttemptInfo attemptInfo = taskInfo.attemptsMap.get(event.getAttemptId()); attemptInfo.finishTime = event.getFinishTime(); - attemptInfo.status = event.getTaskStatus(); - attemptInfo.state = event.getState(); + attemptInfo.status = StringInterner.weakIntern(event.getTaskStatus()); + attemptInfo.state = StringInterner.weakIntern(event.getState()); attemptInfo.counters = event.getCounters(); - attemptInfo.hostname = event.getHostname(); + attemptInfo.hostname = StringInterner.weakIntern(event.getHostname()); } private void handleReduceAttemptFinishedEvent @@ -238,14 +239,14 @@ public class JobHistoryParser { TaskAttemptInfo attemptInfo = taskInfo.attemptsMap.get(event.getAttemptId()); attemptInfo.finishTime = event.getFinishTime(); - attemptInfo.status = event.getTaskStatus(); - attemptInfo.state = event.getState(); + attemptInfo.status = StringInterner.weakIntern(event.getTaskStatus()); + attemptInfo.state = StringInterner.weakIntern(event.getState()); attemptInfo.shuffleFinishTime = event.getShuffleFinishTime(); attemptInfo.sortFinishTime = event.getSortFinishTime(); attemptInfo.counters = event.getCounters(); - attemptInfo.hostname = event.getHostname(); + attemptInfo.hostname = StringInterner.weakIntern(event.getHostname()); attemptInfo.port = event.getPort(); - attemptInfo.rackname = event.getRackName(); + attemptInfo.rackname = StringInterner.weakIntern(event.getRackName()); } private void handleMapAttemptFinishedEvent(MapAttemptFinishedEvent event) { @@ -253,13 +254,13 @@ public class JobHistoryParser { TaskAttemptInfo attemptInfo = taskInfo.attemptsMap.get(event.getAttemptId()); attemptInfo.finishTime = event.getFinishTime(); - attemptInfo.status = event.getTaskStatus(); - attemptInfo.state = event.getState(); + attemptInfo.status = StringInterner.weakIntern(event.getTaskStatus()); + attemptInfo.state = StringInterner.weakIntern(event.getState()); attemptInfo.mapFinishTime = event.getMapFinishTime(); attemptInfo.counters = event.getCounters(); - attemptInfo.hostname = event.getHostname(); + attemptInfo.hostname = StringInterner.weakIntern(event.getHostname()); attemptInfo.port = event.getPort(); - attemptInfo.rackname = event.getRackName(); + attemptInfo.rackname = StringInterner.weakIntern(event.getRackName()); } private void handleTaskAttemptFailedEvent( @@ -269,10 +270,10 @@ public class JobHistoryParser { taskInfo.attemptsMap.get(event.getTaskAttemptId()); attemptInfo.finishTime = event.getFinishTime(); attemptInfo.error = event.getError(); - attemptInfo.status = event.getTaskStatus(); - attemptInfo.hostname = event.getHostname(); + attemptInfo.status = StringInterner.weakIntern(event.getTaskStatus()); + attemptInfo.hostname = StringInterner.weakIntern(event.getHostname()); attemptInfo.port = event.getPort(); - attemptInfo.rackname = event.getRackName(); + attemptInfo.rackname = StringInterner.weakIntern(event.getRackName()); attemptInfo.shuffleFinishTime = event.getFinishTime(); attemptInfo.sortFinishTime = event.getFinishTime(); attemptInfo.mapFinishTime = event.getFinishTime(); @@ -300,7 +301,7 @@ public class JobHistoryParser { attemptInfo.startTime = event.getStartTime(); attemptInfo.attemptId = event.getTaskAttemptId(); attemptInfo.httpPort = event.getHttpPort(); - attemptInfo.trackerName = event.getTrackerName(); + attemptInfo.trackerName = StringInterner.weakIntern(event.getTrackerName()); attemptInfo.taskType = event.getTaskType(); attemptInfo.shufflePort = event.getShufflePort(); attemptInfo.containerId = event.getContainerId(); @@ -344,7 +345,7 @@ public class JobHistoryParser { info.finishTime = event.getFinishTime(); info.finishedMaps = event.getFinishedMaps(); info.finishedReduces = event.getFinishedReduces(); - info.jobStatus = event.getStatus(); + info.jobStatus = StringInterner.weakIntern(event.getStatus()); } private void handleJobFinishedEvent(JobFinishedEvent event) { @@ -375,7 +376,7 @@ public class JobHistoryParser { amInfo.appAttemptId = event.getAppAttemptId(); amInfo.startTime = event.getStartTime(); amInfo.containerId = event.getContainerId(); - amInfo.nodeManagerHost = event.getNodeManagerHost(); + amInfo.nodeManagerHost = StringInterner.weakIntern(event.getNodeManagerHost()); amInfo.nodeManagerPort = event.getNodeManagerPort(); amInfo.nodeManagerHttpPort = event.getNodeManagerHttpPort(); if (info.amInfos == null) { @@ -393,11 +394,11 @@ public class JobHistoryParser { private void handleJobSubmittedEvent(JobSubmittedEvent event) { info.jobid = event.getJobId(); info.jobname = event.getJobName(); - info.username = event.getUserName(); + info.username = StringInterner.weakIntern(event.getUserName()); info.submitTime = event.getSubmitTime(); info.jobConfPath = event.getJobConfPath(); info.jobACLs = event.getJobAcls(); - info.jobQueueName = event.getJobQueueName(); + info.jobQueueName = StringInterner.weakIntern(event.getJobQueueName()); } /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/CountersStrings.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/CountersStrings.java index a0e542ac302..ce799f5f2b6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/CountersStrings.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/CountersStrings.java @@ -28,6 +28,7 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.counters.AbstractCounters; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.counters.CounterGroupBase; +import org.apache.hadoop.util.StringInterner; import org.apache.hadoop.util.StringUtils; /** @@ -235,13 +236,13 @@ public class CountersStrings { // Get the actual name String groupName = - getBlock(groupString, UNIT_OPEN, UNIT_CLOSE, groupIndex); - groupName = unescape(groupName); + StringInterner.weakIntern(getBlock(groupString, UNIT_OPEN, UNIT_CLOSE, groupIndex)); + groupName = StringInterner.weakIntern(unescape(groupName)); // Get the display name String groupDisplayName = - getBlock(groupString, UNIT_OPEN, UNIT_CLOSE, groupIndex); - groupDisplayName = unescape(groupDisplayName); + StringInterner.weakIntern(getBlock(groupString, UNIT_OPEN, UNIT_CLOSE, groupIndex)); + groupDisplayName = StringInterner.weakIntern(unescape(groupDisplayName)); // Get the counters G group = counters.getGroup(groupName); @@ -255,13 +256,13 @@ public class CountersStrings { // Get the actual name String counterName = - getBlock(counterString, UNIT_OPEN, UNIT_CLOSE, counterIndex); - counterName = unescape(counterName); + StringInterner.weakIntern(getBlock(counterString, UNIT_OPEN, UNIT_CLOSE, counterIndex)); + counterName = StringInterner.weakIntern(unescape(counterName)); // Get the display name String counterDisplayName = - getBlock(counterString, UNIT_OPEN, UNIT_CLOSE, counterIndex); - counterDisplayName = unescape(counterDisplayName); + StringInterner.weakIntern(getBlock(counterString, UNIT_OPEN, UNIT_CLOSE, counterIndex)); + counterDisplayName = StringInterner.weakIntern(unescape(counterDisplayName)); // Get the value long value = diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 2b0bc80e698..5ca37e253f4 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -86,7 +86,13 @@ Release 2.0.3-alpha - Unreleased YARN-150. Fixes AppRejectedTransition does not unregister a rejected app-attempt from the ApplicationMasterService (Bikas Saha via sseth) - YARN-140. Add capacity-scheduler-default.xml to provide a default set of configurations for the capacity scheduler. (ahmed via tucu) + YARN-140. Add capacity-scheduler-default.xml to provide a default set of + configurations for the capacity scheduler. (ahmed via tucu) + + YARN-179. Fix some unit test failures. (Vinod Kumar Vavilapalli via sseth) + + YARN-181. Fixed eclipse settings broken by capacity-scheduler.xml move via + YARN-140. (Siddharth Seth via vinodkv) Release 2.0.2-alpha - 2012-09-07 @@ -165,6 +171,20 @@ Release 0.23.5 - UNRELEASED YARN-163. Retrieving container log via NM webapp can hang with multibyte characters in log (jlowe via bobby) + YARN-174. Modify NodeManager to pass the user's configuration even when + rebooting. (vinodkv) + + YARN-177. CapacityScheduler - adding a queue while the RM is running has + wacky results (acmurthy vai tgraves) + + YARN-178. Fix custom ProcessTree instance creation (Radim Kolar via bobby) + + YARN-180. Capacity scheduler - containers that get reserved create + container token to early (acmurthy and bobby) + + YARN-139. Interrupted Exception within AsyncDispatcher leads to user + confusion. (Vinod Kumar Vavilapalli via jlowe) + Release 0.23.4 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml index eee1667c57a..0dc93d037d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml @@ -51,6 +51,12 @@ hadoop-yarn-server-resourcemanager test + + org.apache.hadoop + hadoop-yarn-server-resourcemanager + test + test-jar + org.apache.hadoop hadoop-yarn-server-common diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java index 7950bd155c1..b9e06256a82 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java @@ -143,7 +143,7 @@ public class UnmanagedAMLauncher { appName = cliParser.getOptionValue("appname", "UnmanagedAM"); amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0")); - amQueue = cliParser.getOptionValue("queue", ""); + amQueue = cliParser.getOptionValue("queue", "default"); classpath = cliParser.getOptionValue("classpath", null); amCmd = cliParser.getOptionValue("cmd"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java index 0c417bb7cba..199f64d36eb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java @@ -141,6 +141,8 @@ public class TestUnmanagedAMLauncher { String[] args = { "--classpath", classpath, + "--queue", + "default", "--cmd", javaHome + "/bin/java -Xmx512m " diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml index 45fbc0070c7..10cf85b657c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml @@ -38,5 +38,12 @@ hadoop-yarn-server-resourcemanager test + + org.apache.hadoop + hadoop-yarn-server-resourcemanager + test + test-jar + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index c8f325df244..9377397e489 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -68,7 +68,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { try { event = eventQueue.take(); } catch(InterruptedException ie) { - LOG.warn("AsyncDispatcher thread interrupted", ie); + if (!stopped) { + LOG.warn("AsyncDispatcher thread interrupted", ie); + } return; } if (event != null) { @@ -180,7 +182,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { try { eventQueue.put(event); } catch (InterruptedException e) { - LOG.warn("AsyncDispatcher thread interrupted", e); + if (!stopped) { + LOG.warn("AsyncDispatcher thread interrupted", e); + } throw new YarnException(e); } }; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java index 01ad5094c25..158dcd5adcd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java @@ -114,6 +114,7 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree { * @param procfsDir the root of a proc file system - only used for testing. */ public ProcfsBasedProcessTree(String pid, String procfsDir) { + super(pid); this.pid = getValidPID(pid); this.procfsDir = procfsDir; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorProcessTree.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorProcessTree.java index 0146b592646..e5b4e87a2b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorProcessTree.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorProcessTree.java @@ -21,16 +21,27 @@ package org.apache.hadoop.yarn.util; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; import org.apache.hadoop.util.ReflectionUtils; +import java.lang.reflect.Constructor; /** * Interface class to obtain process resource usage * */ -public abstract class ResourceCalculatorProcessTree { +public abstract class ResourceCalculatorProcessTree extends Configured { static final Log LOG = LogFactory .getLog(ResourceCalculatorProcessTree.class); + /** + * Create process-tree instance with specified root process. + * + * Subclass must override this. + * @param root process-tree root-process + */ + public ResourceCalculatorProcessTree(String root) { + } + /** * Get the process-tree with latest state. If the root-process is not alive, * an empty tree will be returned. @@ -122,10 +133,17 @@ public abstract class ResourceCalculatorProcessTree { * is not available for this system. */ public static ResourceCalculatorProcessTree getResourceCalculatorProcessTree( - String pid, Class clazz, Configuration conf) { + String pid, Class clazz, Configuration conf) { if (clazz != null) { - return ReflectionUtils.newInstance(clazz, conf); + try { + Constructor c = clazz.getConstructor(String.class); + ResourceCalculatorProcessTree rctree = c.newInstance(pid); + rctree.setConf(conf); + return rctree; + } catch(Exception e) { + throw new RuntimeException(e); + } } // No class given, try a os specific class diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestResourceCalculatorProcessTree.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestResourceCalculatorProcessTree.java new file mode 100644 index 00000000000..68b20c97984 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestResourceCalculatorProcessTree.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.util; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; +import static org.junit.Assert.*; +import static org.hamcrest.core.IsInstanceOf.*; +import static org.hamcrest.core.IsSame.*; + +/** + * A JUnit test to test {@link ResourceCalculatorPlugin} + */ +public class TestResourceCalculatorProcessTree { + + public static class EmptyProcessTree extends ResourceCalculatorProcessTree { + + public EmptyProcessTree(String pid) { + super(pid); + } + + public ResourceCalculatorProcessTree getProcessTree() { + return this; + } + + public String getProcessTreeDump() { + return "Empty tree for testing"; + } + + public long getCumulativeRssmem(int age) { + return 0; + } + + public long getCumulativeVmem(int age) { + return 0; + } + + public long getCumulativeCpuTime() { + return 0; + } + + public boolean checkPidPgrpidForMatch() { + return false; + } + } + + @Test + public void testCreateInstance() { + ResourceCalculatorProcessTree tree; + tree = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree("1", EmptyProcessTree.class, new Configuration()); + assertNotNull(tree); + assertThat(tree, instanceOf(EmptyProcessTree.class)); + } + + @Test + public void testCreatedInstanceConfigured() { + ResourceCalculatorProcessTree tree; + Configuration conf = new Configuration(); + tree = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree("1", EmptyProcessTree.class, conf); + assertNotNull(tree); + assertThat(tree.getConf(), sameInstance(conf)); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 0a68f41c84d..cbb3d2914e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -253,12 +253,12 @@ public class NodeManager extends CompositeService implements if (hasToReboot) { LOG.info("Rebooting the node manager."); NodeManager nodeManager = createNewNodeManager(); - nodeManager.initAndStartNodeManager(hasToReboot); + nodeManager.initAndStartNodeManager(this.getConfig(), hasToReboot); } } } - private void initAndStartNodeManager(boolean hasToReboot) { + private void initAndStartNodeManager(Configuration conf, boolean hasToReboot) { try { // Remove the old hook if we are rebooting. @@ -270,7 +270,6 @@ public class NodeManager extends CompositeService implements ShutdownHookManager.get().addShutdownHook(nodeManagerShutdownHook, SHUTDOWN_HOOK_PRIORITY); - YarnConfiguration conf = new YarnConfiguration(); this.init(conf); this.start(); } catch (Throwable t) { @@ -288,6 +287,7 @@ public class NodeManager extends CompositeService implements Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); StringUtils.startupShutdownMessage(NodeManager.class, args, LOG); NodeManager nodeManager = new NodeManager(); - nodeManager.initAndStartNodeManager(false); + Configuration conf = new YarnConfiguration(); + nodeManager.initAndStartNodeManager(conf, false); } } diff --git a/hadoop-yarn-project/hadoop-yarn/conf/capacity-scheduler.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml similarity index 100% rename from hadoop-yarn-project/hadoop-yarn/conf/capacity-scheduler.xml rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml index 503d0e20679..e015f01634e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml @@ -50,7 +50,7 @@ ${basedir}/src/test/resources - ${basedir}/../../conf + ${basedir}/conf capacity-scheduler.xml diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 8a43b2da27b..d21a888cf91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -50,6 +50,12 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue { */ public CSQueue getParent(); + /** + * Set the parent Queue. + * @param newParentQueue new parent queue + */ + public void setParent(CSQueue newParentQueue); + /** * Get the queue name. * @return the queue name @@ -195,10 +201,10 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue { /** * Reinitialize the queue. - * @param queue new queue to re-initalize from + * @param newlyParsedQueue new queue to re-initalize from * @param clusterResource resources in the cluster */ - public void reinitialize(CSQueue queue, Resource clusterResource) + public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException; /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index df01f59f4c1..4534b2746b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -45,7 +45,6 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; @@ -111,21 +110,18 @@ implements ResourceScheduler, CapacitySchedulerContext, Configurable { } }; + @Override public void setConf(Configuration conf) { - if (conf instanceof YarnConfiguration) { - yarnConf = (YarnConfiguration) conf; - } else { - throw new IllegalArgumentException("Can only configure with " + - "YarnConfiguration"); - } + yarnConf = conf; } + @Override public Configuration getConf() { return yarnConf; } private CapacitySchedulerConfiguration conf; - private YarnConfiguration yarnConf; + private Configuration yarnConf; private RMContext rmContext; private Map queues = new ConcurrentHashMap(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index d222b9061a8..e183a2d7020 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -223,7 +223,7 @@ public class LeafQueue implements CSQueue { { // Sanity check CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); - float absCapacity = parent.getAbsoluteCapacity() * capacity; + float absCapacity = getParent().getAbsoluteCapacity() * capacity; CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absCapacity, absoluteMaxCapacity); this.capacity = capacity; @@ -256,7 +256,7 @@ public class LeafQueue implements CSQueue { // Update metrics CSQueueUtils.updateQueueStatistics( - this, parent, clusterResource, minimumAllocation); + this, getParent(), clusterResource, minimumAllocation); LOG.info("Initializing " + queueName + "\n" + "capacity = " + capacity + @@ -339,10 +339,15 @@ public class LeafQueue implements CSQueue { } @Override - public CSQueue getParent() { + public synchronized CSQueue getParent() { return parent; } - + + @Override + public synchronized void setParent(CSQueue newParentQueue) { + this.parent = (ParentQueue)newParentQueue; + } + @Override public String getQueueName() { return queueName; @@ -350,7 +355,7 @@ public class LeafQueue implements CSQueue { @Override public String getQueuePath() { - return parent.getQueuePath() + "." + getQueueName(); + return getParent().getQueuePath() + "." + getQueueName(); } /** @@ -430,7 +435,9 @@ public class LeafQueue implements CSQueue { synchronized void setMaxCapacity(float maximumCapacity) { // Sanity check CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); - float absMaxCapacity = CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent); + float absMaxCapacity = + CSQueueUtils.computeAbsoluteMaximumCapacity( + maximumCapacity, getParent()); CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absoluteCapacity, absMaxCapacity); this.maximumCapacity = maximumCapacity; @@ -453,10 +460,6 @@ public class LeafQueue implements CSQueue { this.userLimitFactor = userLimitFactor; } - synchronized void setParentQueue(CSQueue parent) { - this.parent = parent; - } - @Override public synchronized int getNumApplications() { return getNumPendingApplications() + getNumActiveApplications(); @@ -559,26 +562,28 @@ public class LeafQueue implements CSQueue { } @Override - public synchronized void reinitialize(CSQueue queue, Resource clusterResource) + public synchronized void reinitialize( + CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { // Sanity check - if (!(queue instanceof LeafQueue) || - !queue.getQueuePath().equals(getQueuePath())) { + if (!(newlyParsedQueue instanceof LeafQueue) || + !newlyParsedQueue.getQueuePath().equals(getQueuePath())) { throw new IOException("Trying to reinitialize " + getQueuePath() + - " from " + queue.getQueuePath()); + " from " + newlyParsedQueue.getQueuePath()); } - LeafQueue leafQueue = (LeafQueue)queue; + LeafQueue newlyParsedLeafQueue = (LeafQueue)newlyParsedQueue; setupQueueConfigs( clusterResource, - leafQueue.capacity, leafQueue.absoluteCapacity, - leafQueue.maximumCapacity, leafQueue.absoluteMaxCapacity, - leafQueue.userLimit, leafQueue.userLimitFactor, - leafQueue.maxApplications, - leafQueue.getMaxApplicationsPerUser(), - leafQueue.getMaximumActiveApplications(), - leafQueue.getMaximumActiveApplicationsPerUser(), - leafQueue.state, leafQueue.acls); + newlyParsedLeafQueue.capacity, newlyParsedLeafQueue.absoluteCapacity, + newlyParsedLeafQueue.maximumCapacity, + newlyParsedLeafQueue.absoluteMaxCapacity, + newlyParsedLeafQueue.userLimit, newlyParsedLeafQueue.userLimitFactor, + newlyParsedLeafQueue.maxApplications, + newlyParsedLeafQueue.getMaxApplicationsPerUser(), + newlyParsedLeafQueue.getMaximumActiveApplications(), + newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(), + newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls); } @Override @@ -591,7 +596,7 @@ public class LeafQueue implements CSQueue { } // Check if parent-queue allows access - return parent.hasAccess(acl, user); + return getParent().hasAccess(acl, user); } @Override @@ -649,10 +654,10 @@ public class LeafQueue implements CSQueue { // Inform the parent queue try { - parent.submitApplication(application, userName, queue); + getParent().submitApplication(application, userName, queue); } catch (AccessControlException ace) { LOG.info("Failed to submit application to parent-queue: " + - parent.getQueuePath(), ace); + getParent().getQueuePath(), ace); removeApplication(application, user); throw ace; } @@ -708,7 +713,7 @@ public class LeafQueue implements CSQueue { } // Inform the parent queue - parent.finishApplication(application, queue); + getParent().finishApplication(application, queue); } public synchronized void removeApplication(FiCaSchedulerApp application, User user) { @@ -1183,34 +1188,32 @@ public class LeafQueue implements CSQueue { return (rmContainer != null) ? rmContainer.getContainer() : createContainer(application, node, capability, priority); } - - public Container createContainer(FiCaSchedulerApp application, FiCaSchedulerNode node, + Container createContainer(FiCaSchedulerApp application, FiCaSchedulerNode node, Resource capability, Priority priority) { - + NodeId nodeId = node.getRMNode().getNodeID(); ContainerId containerId = BuilderUtils.newContainerId(application .getApplicationAttemptId(), application.getNewContainerId()); - ContainerToken containerToken = null; - - // If security is enabled, send the container-tokens too. - if (UserGroupInformation.isSecurityEnabled()) { - containerToken = - containerTokenSecretManager.createContainerToken(containerId, nodeId, - application.getUser(), capability); - if (containerToken == null) { - return null; // Try again later. - } - } - + // Create the container Container container = BuilderUtils.newContainer(containerId, nodeId, node.getRMNode().getHttpAddress(), capability, priority, - containerToken); - + null); + return container; } - + + /** + * Create ContainerToken, only in secure-mode + */ + ContainerToken createContainerToken( + FiCaSchedulerApp application, Container container) { + return containerTokenSecretManager.createContainerToken( + container.getId(), container.getNodeId(), + application.getUser(), container.getResource()); + } + private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, ResourceRequest request, NodeType type, RMContainer rmContainer) { @@ -1246,6 +1249,17 @@ public class LeafQueue implements CSQueue { unreserve(application, priority, node, rmContainer); } + // Create container tokens in secure-mode + if (UserGroupInformation.isSecurityEnabled()) { + ContainerToken containerToken = + createContainerToken(application, container); + if (containerToken == null) { + // Something went wrong... + return Resources.none(); + } + container.setContainerToken(containerToken); + } + // Inform the application RMContainer allocatedContainer = application.allocate(type, node, priority, request, container); @@ -1351,7 +1365,7 @@ public class LeafQueue implements CSQueue { } // Inform the parent queue - parent.completedContainer(clusterResource, application, + getParent().completedContainer(clusterResource, application, node, rmContainer, null, event); } } @@ -1361,7 +1375,7 @@ public class LeafQueue implements CSQueue { // Update queue metrics Resources.addTo(usedResources, resource); CSQueueUtils.updateQueueStatistics( - this, parent, clusterResource, minimumAllocation); + this, getParent(), clusterResource, minimumAllocation); ++numContainers; // Update user metrics @@ -1386,7 +1400,7 @@ public class LeafQueue implements CSQueue { // Update queue metrics Resources.subtractFrom(usedResources, resource); CSQueueUtils.updateQueueStatistics( - this, parent, clusterResource, minimumAllocation); + this, getParent(), clusterResource, minimumAllocation); --numContainers; // Update user metrics @@ -1417,7 +1431,7 @@ public class LeafQueue implements CSQueue { // Update metrics CSQueueUtils.updateQueueStatistics( - this, parent, clusterResource, minimumAllocation); + this, getParent(), clusterResource, minimumAllocation); // Update application properties for (FiCaSchedulerApp application : activeApplications) { @@ -1488,7 +1502,7 @@ public class LeafQueue implements CSQueue { synchronized (this) { allocateResource(clusterResource, application, container.getResource()); } - parent.recoverContainer(clusterResource, application, container); + getParent().recoverContainer(clusterResource, application, container); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 25e982bbc03..75fcbde516c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -60,7 +60,7 @@ public class ParentQueue implements CSQueue { private static final Log LOG = LogFactory.getLog(ParentQueue.class); - private final CSQueue parent; + private CSQueue parent; private final String queueName; private float capacity; @@ -216,10 +216,15 @@ public class ParentQueue implements CSQueue { } @Override - public CSQueue getParent() { + public synchronized CSQueue getParent() { return parent; } + @Override + public synchronized void setParent(CSQueue newParentQueue) { + this.parent = (ParentQueue)newParentQueue; + } + @Override public String getQueueName() { return queueName; @@ -357,37 +362,52 @@ public class ParentQueue implements CSQueue { } @Override - public synchronized void reinitialize(CSQueue queue, Resource clusterResource) + public synchronized void reinitialize( + CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { // Sanity check - if (!(queue instanceof ParentQueue) || - !queue.getQueuePath().equals(getQueuePath())) { + if (!(newlyParsedQueue instanceof ParentQueue) || + !newlyParsedQueue.getQueuePath().equals(getQueuePath())) { throw new IOException("Trying to reinitialize " + getQueuePath() + - " from " + queue.getQueuePath()); + " from " + newlyParsedQueue.getQueuePath()); } - ParentQueue parentQueue = (ParentQueue)queue; + ParentQueue newlyParsedParentQueue = (ParentQueue)newlyParsedQueue; // Set new configs setupQueueConfigs(clusterResource, - parentQueue.capacity, parentQueue.absoluteCapacity, - parentQueue.maximumCapacity, parentQueue.absoluteMaxCapacity, - parentQueue.state, parentQueue.acls); + newlyParsedParentQueue.capacity, + newlyParsedParentQueue.absoluteCapacity, + newlyParsedParentQueue.maximumCapacity, + newlyParsedParentQueue.absoluteMaxCapacity, + newlyParsedParentQueue.state, + newlyParsedParentQueue.acls); // Re-configure existing child queues and add new ones // The CS has already checked to ensure all existing child queues are present! Map currentChildQueues = getQueues(childQueues); - Map newChildQueues = getQueues(parentQueue.childQueues); + Map newChildQueues = + getQueues(newlyParsedParentQueue.childQueues); for (Map.Entry e : newChildQueues.entrySet()) { String newChildQueueName = e.getKey(); CSQueue newChildQueue = e.getValue(); CSQueue childQueue = currentChildQueues.get(newChildQueueName); - if (childQueue != null){ + + // Check if the child-queue already exists + if (childQueue != null) { + // Re-init existing child queues childQueue.reinitialize(newChildQueue, clusterResource); LOG.info(getQueueName() + ": re-configured queue: " + childQueue); } else { + // New child queue, do not re-init + + // Set parent to 'this' + newChildQueue.setParent(this); + + // Save in list of current child queues currentChildQueues.put(newChildQueueName, newChildQueue); + LOG.info(getQueueName() + ": added new child queue: " + newChildQueue); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 8a7b89eaf2f..e7af5afcc20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -378,4 +378,43 @@ public class TestCapacityScheduler { Assert.assertEquals(4 * GB, cs.getClusterResources().getMemory()); } + + @Test + public void testRefreshQueuesWithNewQueue() throws Exception { + CapacityScheduler cs = new CapacityScheduler(); + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + cs.setConf(new YarnConfiguration()); + cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null, + null, new RMContainerTokenSecretManager(conf), + new ClientToAMTokenSecretManagerInRM())); + checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + + // Add a new queue b4 + String B4 = B + ".b4"; + float B4_CAPACITY = 10; + + B3_CAPACITY -= B4_CAPACITY; + try { + conf.setCapacity(A, 80f); + conf.setCapacity(B, 20f); + conf.setQueues(B, new String[] {"b1", "b2", "b3", "b4"}); + conf.setCapacity(B1, B1_CAPACITY); + conf.setCapacity(B2, B2_CAPACITY); + conf.setCapacity(B3, B3_CAPACITY); + conf.setCapacity(B4, B4_CAPACITY); + cs.reinitialize(conf,null); + checkQueueCapacities(cs, 80f, 20f); + + // Verify parent for B4 + CSQueue rootQueue = cs.getRootQueue(); + CSQueue queueB = findQueue(rootQueue, B); + CSQueue queueB4 = findQueue(queueB, B4); + + assertEquals(queueB, queueB4.getParent()); + } finally { + B3_CAPACITY += B4_CAPACITY; + } + } + }