diff --git a/dev-support/test-patch.sh b/dev-support/test-patch.sh index cbeb81987e7..3905c5a63b4 100755 --- a/dev-support/test-patch.sh +++ b/dev-support/test-patch.sh @@ -454,7 +454,7 @@ checkJavadocWarnings () { JIRA_COMMENT="$JIRA_COMMENT {color:red}-1 javadoc{color}. The javadoc tool appears to have generated `expr $(($numPatchJavadocWarnings-$numTrunkJavadocWarnings))` warning messages. - See $BUILD_URL/artifact/trunk/patchprocess/diffJavadocWarnings.txt for details." + See $BUILD_URL/artifact/PreCommit-HADOOP-Build-patchprocess/diffJavadocWarnings.txt for details." return 1 fi fi @@ -498,7 +498,7 @@ checkJavacWarnings () { {color:red}-1 javac{color}. The applied patch generated $patchJavacWarnings javac compiler warnings (more than the trunk's current $trunkJavacWarnings warnings)." $DIFF $PATCH_DIR/filteredTrunkJavacWarnings.txt $PATCH_DIR/filteredPatchJavacWarnings.txt > $PATCH_DIR/diffJavacWarnings.txt - JIRA_COMMENT_FOOTER="Javac warnings: $BUILD_URL/artifact/trunk/patchprocess/diffJavacWarnings.txt + JIRA_COMMENT_FOOTER="Javac warnings: $BUILD_URL/artifact/PreCommit-HADOOP-Build-patchprocess/diffJavacWarnings.txt $JIRA_COMMENT_FOOTER" return 1 @@ -540,7 +540,7 @@ checkReleaseAuditWarnings () { {color:red}-1 release audit{color}. The applied patch generated $patchReleaseAuditWarnings release audit warnings." $GREP '\!?????' $PATCH_DIR/patchReleaseAuditWarnings.txt > $PATCH_DIR/patchReleaseAuditProblems.txt echo "Lines that start with ????? in the release audit report indicate files that do not have an Apache license header." >> $PATCH_DIR/patchReleaseAuditProblems.txt - JIRA_COMMENT_FOOTER="Release audit warnings: $BUILD_URL/artifact/trunk/patchprocess/patchReleaseAuditProblems.txt + JIRA_COMMENT_FOOTER="Release audit warnings: $BUILD_URL/artifact/PreCommit-HADOOP-Build-patchprocess/patchReleaseAuditProblems.txt $JIRA_COMMENT_FOOTER" return 1 fi @@ -659,7 +659,7 @@ checkFindbugsWarnings () { $PATCH_DIR/newPatchFindbugsWarnings${module_suffix}.xml \ $PATCH_DIR/newPatchFindbugsWarnings${module_suffix}.html if [[ $newFindbugsWarnings > 0 ]] ; then - JIRA_COMMENT_FOOTER="Findbugs warnings: $BUILD_URL/artifact/trunk/patchprocess/newPatchFindbugsWarnings${module_suffix}.html + JIRA_COMMENT_FOOTER="Findbugs warnings: $BUILD_URL/artifact/PreCommit-HADOOP-Build-patchprocess/newPatchFindbugsWarnings${module_suffix}.html $JIRA_COMMENT_FOOTER" fi done diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 0ca2953fcc1..2e2d5698135 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -533,6 +533,8 @@ Release 2.6.0 - UNRELEASED HADOOP-11016. KMS should support signing cookies with zookeeper secret manager. (tucu) + HADOOP-11106. Document considerations of HAR and Encryption. (clamb via wang) + OPTIMIZATIONS HADOOP-10838. Byte array native checksumming. (James Thomas via todd) @@ -829,6 +831,9 @@ Release 2.6.0 - UNRELEASED HADOOP-11099. KMS return HTTP UNAUTHORIZED 401 on ACL failure. (tucu) + HADOOP-11105. MetricsSystemImpl could leak memory in registered callbacks. + (Chuan Liu via cnauroth) + Release 2.5.1 - 2014-09-05 INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java index 722abd95c4a..2107e68895b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java @@ -83,7 +83,12 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource { private final Map allSources; private final Map sinks; private final Map allSinks; + + // The callback list is used by register(Callback callback), while + // the callback map is used by register(String name, String desc, T sink) private final List callbacks; + private final Map namedCallbacks; + private final MetricsCollectorImpl collector; private final MetricsRegistry registry = new MetricsRegistry(MS_NAME); @Metric({"Snapshot", "Snapshot stats"}) MutableStat snapshotStat; @@ -119,6 +124,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource { sourceConfigs = Maps.newHashMap(); sinkConfigs = Maps.newHashMap(); callbacks = Lists.newArrayList(); + namedCallbacks = Maps.newHashMap(); injectedTags = Lists.newArrayList(); collector = new MetricsCollectorImpl(); if (prefix != null) { @@ -178,11 +184,13 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource { return; } for (Callback cb : callbacks) cb.preStart(); + for (Callback cb : namedCallbacks.values()) cb.preStart(); configure(prefix); startTimer(); monitoring = true; LOG.info(prefix +" metrics system started"); for (Callback cb : callbacks) cb.postStart(); + for (Callback cb : namedCallbacks.values()) cb.postStart(); } @Override @@ -198,6 +206,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource { return; } for (Callback cb : callbacks) cb.preStop(); + for (Callback cb : namedCallbacks.values()) cb.preStop(); LOG.info("Stopping "+ prefix +" metrics system..."); stopTimer(); stopSources(); @@ -206,6 +215,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource { monitoring = false; LOG.info(prefix +" metrics system stopped."); for (Callback cb : callbacks) cb.postStop(); + for (Callback cb : namedCallbacks.values()) cb.postStop(); } @Override public synchronized @@ -224,7 +234,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource { } // We want to re-register the source to pick up new config when the // metrics system restarts. - register(new AbstractCallback() { + register(name, new AbstractCallback() { @Override public void postStart() { registerSource(finalName, finalDesc, s); } @@ -241,6 +251,9 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource { if (allSources.containsKey(name)) { allSources.remove(name); } + if (namedCallbacks.containsKey(name)) { + namedCallbacks.remove(name); + } } synchronized @@ -268,7 +281,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource { } // We want to re-register the sink to pick up new config // when the metrics system restarts. - register(new AbstractCallback() { + register(name, new AbstractCallback() { @Override public void postStart() { register(name, description, sink); } @@ -289,9 +302,16 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource { @Override public synchronized void register(final Callback callback) { - callbacks.add((Callback) Proxy.newProxyInstance( - callback.getClass().getClassLoader(), new Class[] { Callback.class }, - new InvocationHandler() { + callbacks.add((Callback) getProxyForCallback(callback)); + } + + private synchronized void register(String name, final Callback callback) { + namedCallbacks.put(name, (Callback) getProxyForCallback(callback)); + } + + private Object getProxyForCallback(final Callback callback) { + return Proxy.newProxyInstance(callback.getClass().getClassLoader(), + new Class[] { Callback.class }, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { @@ -299,11 +319,11 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource { return method.invoke(callback, args); } catch (Exception e) { // These are not considered fatal. - LOG.warn("Caught exception in callback "+ method.getName(), e); + LOG.warn("Caught exception in callback " + method.getName(), e); } return null; } - })); + }); } @Override @@ -572,6 +592,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource { allSources.clear(); allSinks.clear(); callbacks.clear(); + namedCallbacks.clear(); if (mbeanName != null) { MBeans.unregister(mbeanName); mbeanName = null; diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java index f2d26115d10..aad20e0bec3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java @@ -241,7 +241,7 @@ class DFSClientCache { public FSDataInputStream load(DFSInputStreamCaheKey key) throws Exception { DFSClient client = getDfsClient(key.userId); DFSInputStream dis = client.open(key.inodePath); - return new FSDataInputStream(dis); + return client.createWrappedInputStream(dis); } }; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java index ede65c62837..71908d86530 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java @@ -678,7 +678,7 @@ class OpenFileCtx { } try { - fis = new FSDataInputStream(dfsClient.open(path)); + fis = dfsClient.createWrappedInputStream(dfsClient.open(path)); readCount = fis.read(offset, readbuffer, 0, count); if (readCount < count) { LOG.error("Can't read back " + count + " bytes, partial read size:" diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java index 0d591d63963..70c37d86cbf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java @@ -922,8 +922,9 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) : EnumSet.of(CreateFlag.CREATE); - fos = new HdfsDataOutputStream(dfsClient.create(fileIdPath, permission, - flag, false, replication, blockSize, null, bufferSize, null), + fos = dfsClient.createWrappedOutputStream( + dfsClient.create(fileIdPath, permission, flag, false, replication, + blockSize, null, bufferSize, null), statistics); if ((createMode == Nfs3Constant.CREATE_UNCHECKED) diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java index 05b976da8be..68efac2b9cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java @@ -17,19 +17,27 @@ */ package org.apache.hadoop.hdfs.nfs.nfs3; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.io.EOFException; +import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.EnumSet; +import org.apache.hadoop.crypto.key.JavaKeyStoreProvider; +import org.apache.hadoop.crypto.key.KeyProviderFactory; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.client.HdfsAdmin; import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys; import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@ -98,12 +106,16 @@ public class TestRpcProgramNfs3 { static DistributedFileSystem hdfs; static MiniDFSCluster cluster = null; static NfsConfiguration config = new NfsConfiguration(); + static HdfsAdmin dfsAdmin; static NameNode nn; static Nfs3 nfs; static RpcProgramNfs3 nfsd; static SecurityHandler securityHandler; static SecurityHandler securityHandlerUnpriviledged; static String testdir = "/tmp"; + private static final String TEST_KEY = "testKey"; + private static FileSystemTestHelper fsHelper; + private static File testRootDir; @BeforeClass public static void setup() throws Exception { @@ -114,12 +126,20 @@ public class TestRpcProgramNfs3 { .getProxySuperuserGroupConfKey(currentUser), "*"); config.set(DefaultImpersonationProvider.getTestProvider() .getProxySuperuserIpConfKey(currentUser), "*"); + fsHelper = new FileSystemTestHelper(); + // Set up java key store + String testRoot = fsHelper.getTestRootDir(); + testRootDir = new File(testRoot).getAbsoluteFile(); + final Path jksPath = new Path(testRootDir.toString(), "test.jks"); + config.set(KeyProviderFactory.KEY_PROVIDER_PATH, + JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri()); ProxyUsers.refreshSuperUserGroupsConfiguration(config); cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build(); cluster.waitActive(); hdfs = cluster.getFileSystem(); nn = cluster.getNameNode(); + dfsAdmin = new HdfsAdmin(cluster.getURI(), config); // Use ephemeral ports in case tests are running in parallel config.setInt("nfs3.mountd.port", 0); @@ -131,6 +151,8 @@ public class TestRpcProgramNfs3 { nfs.startServiceInternal(false); nfsd = (RpcProgramNfs3) nfs.getRpcProgram(); + hdfs.getClient().setKeyProvider(nn.getNamesystem().getProvider()); + DFSTestUtil.createKey(TEST_KEY, cluster, config); // Mock SecurityHandler which returns system user.name securityHandler = Mockito.mock(SecurityHandler.class); @@ -310,6 +332,105 @@ public class TestRpcProgramNfs3 { response2.getStatus()); } + @Test(timeout = 120000) + public void testEncryptedReadWrite() throws Exception { + final int len = 8192; + + final Path zone = new Path("/zone"); + hdfs.mkdirs(zone); + dfsAdmin.createEncryptionZone(zone, TEST_KEY); + + final byte[] buffer = new byte[len]; + for (int i = 0; i < len; i++) { + buffer[i] = (byte) i; + } + + final String encFile1 = "/zone/myfile"; + createFileUsingNfs(encFile1, buffer); + commit(encFile1, len); + assertArrayEquals("encFile1 not equal", + getFileContentsUsingNfs(encFile1, len), + getFileContentsUsingDfs(encFile1, len)); + + /* + * Same thing except this time create the encrypted file using DFS. + */ + final String encFile2 = "/zone/myfile2"; + final Path encFile2Path = new Path(encFile2); + DFSTestUtil.createFile(hdfs, encFile2Path, len, (short) 1, 0xFEED); + assertArrayEquals("encFile2 not equal", + getFileContentsUsingNfs(encFile2, len), + getFileContentsUsingDfs(encFile2, len)); + } + + private void createFileUsingNfs(String fileName, byte[] buffer) + throws Exception { + DFSTestUtil.createFile(hdfs, new Path(fileName), 0, (short) 1, 0); + + final HdfsFileStatus status = nn.getRpcServer().getFileInfo(fileName); + final long dirId = status.getFileId(); + final FileHandle handle = new FileHandle(dirId); + + final WRITE3Request writeReq = new WRITE3Request(handle, 0, + buffer.length, WriteStableHow.DATA_SYNC, ByteBuffer.wrap(buffer)); + final XDR xdr_req = new XDR(); + writeReq.serialize(xdr_req); + + final WRITE3Response response = nfsd.write(xdr_req.asReadOnlyWrap(), + null, 1, securityHandler, + new InetSocketAddress("localhost", 1234)); + assertEquals("Incorrect response: ", null, response); + } + + private byte[] getFileContentsUsingNfs(String fileName, int len) + throws Exception { + final HdfsFileStatus status = nn.getRpcServer().getFileInfo(fileName); + final long dirId = status.getFileId(); + final FileHandle handle = new FileHandle(dirId); + + final READ3Request readReq = new READ3Request(handle, 0, len); + final XDR xdr_req = new XDR(); + readReq.serialize(xdr_req); + + final READ3Response response = nfsd.read(xdr_req.asReadOnlyWrap(), + securityHandler, new InetSocketAddress("localhost", 1234)); + assertEquals("Incorrect return code: ", Nfs3Status.NFS3_OK, + response.getStatus()); + assertTrue("expected full read", response.isEof()); + return response.getData().array(); + } + + private byte[] getFileContentsUsingDfs(String fileName, int len) + throws Exception { + final FSDataInputStream in = hdfs.open(new Path(fileName)); + final byte[] ret = new byte[len]; + in.readFully(ret); + try { + in.readByte(); + Assert.fail("expected end of file"); + } catch (EOFException e) { + // expected. Unfortunately there is no associated message to check + } + in.close(); + return ret; + } + + private void commit(String fileName, int len) throws Exception { + final HdfsFileStatus status = nn.getRpcServer().getFileInfo(fileName); + final long dirId = status.getFileId(); + final FileHandle handle = new FileHandle(dirId); + final XDR xdr_req = new XDR(); + final COMMIT3Request req = new COMMIT3Request(handle, 0, len); + req.serialize(xdr_req); + + Channel ch = Mockito.mock(Channel.class); + + COMMIT3Response response2 = nfsd.commit(xdr_req.asReadOnlyWrap(), + ch, 1, securityHandler, + new InetSocketAddress("localhost", 1234)); + assertEquals("Incorrect COMMIT3Response:", null, response2); + } + @Test(timeout = 60000) public void testWrite() throws Exception { HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index a6261949a27..a2f0e5a5655 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -477,6 +477,9 @@ Release 2.6.0 - UNRELEASED HDFS-7047. Expose FileStatus#isEncrypted in libhdfs (cmccabe) + HDFS-7003. Add NFS Gateway support for reading and writing to + encryption zones. (clamb via wang) + OPTIMIZATIONS HDFS-6690. Deduplicate xattr names in memory. (wang) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 456fac63425..dfc9e0dccc4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -3089,4 +3089,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public KeyProviderCryptoExtension getKeyProvider() { return provider; } + + @VisibleForTesting + public void setKeyProvider(KeyProviderCryptoExtension provider) { + this.provider = provider; + } } diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 5d1e5f53bab..0c6bbf781b7 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -251,6 +251,9 @@ Release 2.6.0 - UNRELEASED MAPREDUCE-5130. Add missing job config options to mapred-default.xml (Ray Chiang via Sandy Ryza) + MAPREDUCE-5891. Improved shuffle error handling across NM restarts + (Junping Du via jlowe) + OPTIMIZATIONS BUG FIXES @@ -347,6 +350,12 @@ Release 2.6.0 - UNRELEASED MAPREDUCE-6070. yarn.app.am.resource.mb/cpu-vcores affects uber mode but is not documented (Tsuyoshi OZAWA via jlowe) + MAPREDUCE-6090. mapred hsadmin getGroups fails to connect in some cases + (Robert Kanter via jlowe) + + MAPREDUCE-6086. mapreduce.job.credentials.binary should allow all URIs. + (Zhihai Xu via kasha) + Release 2.5.1 - 2014-09-05 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java index 0734e7f2953..6cd569a65ce 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java @@ -578,7 +578,9 @@ class JobSubmitter { conf.get("mapreduce.job.credentials.binary"); if (binaryTokenFilename != null) { Credentials binary = Credentials.readTokenStorageFile( - new Path("file:///" + binaryTokenFilename), conf); + FileSystem.getLocal(conf).makeQualified( + new Path(binaryTokenFilename)), + conf); credentials.addAll(binary); } // add secret keys coming from a json file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 4c48cf51235..e39dd6a30cd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -298,6 +298,14 @@ public interface MRJobConfig { public static final String MAX_FETCH_FAILURES_NOTIFICATIONS = "mapreduce.reduce.shuffle.max-fetch-failures-notifications"; public static final int DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS = 3; + + public static final String SHUFFLE_FETCH_RETRY_INTERVAL_MS = "mapreduce.reduce.shuffle.fetch.retry.interval-ms"; + /** Default interval that fetcher retry to fetch during NM restart.*/ + public final static int DEFAULT_SHUFFLE_FETCH_RETRY_INTERVAL_MS = 1000; + + public static final String SHUFFLE_FETCH_RETRY_TIMEOUT_MS = "mapreduce.reduce.shuffle.fetch.retry.timeout-ms"; + + public static final String SHUFFLE_FETCH_RETRY_ENABLED = "mapreduce.reduce.shuffle.fetch.retry.enabled"; public static final String SHUFFLE_NOTIFY_READERROR = "mapreduce.reduce.shuffle.notify.readerror"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java index cadd04b5634..7b1f657ec40 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java @@ -134,7 +134,9 @@ public class TokenCache { Credentials binary; try { binary = Credentials.readTokenStorageFile( - new Path("file:///" + binaryTokenFilename), conf); + FileSystem.getLocal(conf).makeQualified( + new Path(binaryTokenFilename)), + conf); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java index e1e16635a1b..a41620058cf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java @@ -27,6 +27,7 @@ import java.net.URL; import java.net.URLConnection; import java.security.GeneralSecurityException; import java.util.Arrays; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -46,6 +47,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; import org.apache.hadoop.mapreduce.CryptoUtils; import org.apache.hadoop.security.ssl.SSLFactory; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import com.google.common.annotations.VisibleForTesting; @@ -85,10 +88,18 @@ class Fetcher extends Thread { private final int connectionTimeout; private final int readTimeout; + private final int fetchRetryTimeout; + private final int fetchRetryInterval; + + private final boolean fetchRetryEnabled; + private final SecretKey shuffleSecretKey; protected HttpURLConnection connection; private volatile boolean stopped = false; + + // Initiative value is 0, which means it hasn't retried yet. + private long retryStartTime = 0; private static boolean sslShuffle; private static SSLFactory sslFactory; @@ -135,6 +146,19 @@ class Fetcher extends Thread { this.readTimeout = job.getInt(MRJobConfig.SHUFFLE_READ_TIMEOUT, DEFAULT_READ_TIMEOUT); + this.fetchRetryInterval = job.getInt(MRJobConfig.SHUFFLE_FETCH_RETRY_INTERVAL_MS, + MRJobConfig.DEFAULT_SHUFFLE_FETCH_RETRY_INTERVAL_MS); + + this.fetchRetryTimeout = job.getInt(MRJobConfig.SHUFFLE_FETCH_RETRY_TIMEOUT_MS, + DEFAULT_STALLED_COPY_TIMEOUT); + + boolean shuffleFetchEnabledDefault = job.getBoolean( + YarnConfiguration.NM_RECOVERY_ENABLED, + YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED); + this.fetchRetryEnabled = job.getBoolean( + MRJobConfig.SHUFFLE_FETCH_RETRY_ENABLED, + shuffleFetchEnabledDefault); + setName("fetcher#" + id); setDaemon(true); @@ -242,6 +266,8 @@ class Fetcher extends Thread { */ @VisibleForTesting protected void copyFromHost(MapHost host) throws IOException { + // reset retryStartTime for a new host + retryStartTime = 0; // Get completed maps on 'host' List maps = scheduler.getMapsForHost(host); @@ -261,60 +287,14 @@ class Fetcher extends Thread { // Construct the url and connect DataInputStream input = null; + URL url = getMapOutputURL(host, maps); try { - URL url = getMapOutputURL(host, maps); - openConnection(url); + setupConnectionsWithRetry(host, remaining, url); + if (stopped) { abortConnect(host, remaining); return; } - - // generate hash of the url - String msgToEncode = SecureShuffleUtils.buildMsgFrom(url); - String encHash = SecureShuffleUtils.hashFromString(msgToEncode, - shuffleSecretKey); - - // put url hash into http header - connection.addRequestProperty( - SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash); - // set the read timeout - connection.setReadTimeout(readTimeout); - // put shuffle version into http header - connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, - ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); - connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, - ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); - connect(connection, connectionTimeout); - // verify that the thread wasn't stopped during calls to connect - if (stopped) { - abortConnect(host, remaining); - return; - } - input = new DataInputStream(connection.getInputStream()); - - // Validate response code - int rc = connection.getResponseCode(); - if (rc != HttpURLConnection.HTTP_OK) { - throw new IOException( - "Got invalid response code " + rc + " from " + url + - ": " + connection.getResponseMessage()); - } - // get the shuffle version - if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals( - connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME)) - || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals( - connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))) { - throw new IOException("Incompatible shuffle response version"); - } - // get the replyHash which is HMac of the encHash we sent to the server - String replyHash = connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH); - if(replyHash==null) { - throw new IOException("security validation of TT Map output failed"); - } - LOG.debug("url="+msgToEncode+";encHash="+encHash+";replyHash="+replyHash); - // verify that replyHash is HMac of encHash - SecureShuffleUtils.verifyReply(replyHash, encHash, shuffleSecretKey); - LOG.info("for url="+msgToEncode+" sent hash and received reply"); } catch (IOException ie) { boolean connectExcpt = ie instanceof ConnectException; ioErrs.increment(1); @@ -336,6 +316,8 @@ class Fetcher extends Thread { return; } + input = new DataInputStream(connection.getInputStream()); + try { // Loop through available map-outputs and fetch them // On any error, faildTasks is not null and we exit @@ -343,7 +325,23 @@ class Fetcher extends Thread { // yet_to_be_fetched list and marking the failed tasks. TaskAttemptID[] failedTasks = null; while (!remaining.isEmpty() && failedTasks == null) { - failedTasks = copyMapOutput(host, input, remaining); + try { + failedTasks = copyMapOutput(host, input, remaining, fetchRetryEnabled); + } catch (IOException e) { + // + // Setup connection again if disconnected by NM + connection.disconnect(); + // Get map output from remaining tasks only. + url = getMapOutputURL(host, remaining); + + // Connect with retry as expecting host's recovery take sometime. + setupConnectionsWithRetry(host, remaining, url); + if (stopped) { + abortConnect(host, remaining); + return; + } + input = new DataInputStream(connection.getInputStream()); + } } if(failedTasks != null && failedTasks.length > 0) { @@ -371,19 +369,111 @@ class Fetcher extends Thread { } } } + + private void setupConnectionsWithRetry(MapHost host, + Set remaining, URL url) throws IOException { + openConnectionWithRetry(host, remaining, url); + if (stopped) { + return; + } + + // generate hash of the url + String msgToEncode = SecureShuffleUtils.buildMsgFrom(url); + String encHash = SecureShuffleUtils.hashFromString(msgToEncode, + shuffleSecretKey); + + setupShuffleConnection(encHash); + connect(connection, connectionTimeout); + // verify that the thread wasn't stopped during calls to connect + if (stopped) { + return; + } + + verifyConnection(url, msgToEncode, encHash); + } + + private void openConnectionWithRetry(MapHost host, + Set remaining, URL url) throws IOException { + long startTime = Time.monotonicNow(); + boolean shouldWait = true; + while (shouldWait) { + try { + openConnection(url); + shouldWait = false; + } catch (IOException e) { + if (!fetchRetryEnabled) { + // throw exception directly if fetch's retry is not enabled + throw e; + } + if ((Time.monotonicNow() - startTime) >= this.fetchRetryTimeout) { + LOG.warn("Failed to connect to host: " + url + "after " + + fetchRetryTimeout + "milliseconds."); + throw e; + } + try { + Thread.sleep(this.fetchRetryInterval); + } catch (InterruptedException e1) { + if (stopped) { + return; + } + } + } + } + } + + private void verifyConnection(URL url, String msgToEncode, String encHash) + throws IOException { + // Validate response code + int rc = connection.getResponseCode(); + if (rc != HttpURLConnection.HTTP_OK) { + throw new IOException( + "Got invalid response code " + rc + " from " + url + + ": " + connection.getResponseMessage()); + } + // get the shuffle version + if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals( + connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME)) + || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals( + connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))) { + throw new IOException("Incompatible shuffle response version"); + } + // get the replyHash which is HMac of the encHash we sent to the server + String replyHash = connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH); + if(replyHash==null) { + throw new IOException("security validation of TT Map output failed"); + } + LOG.debug("url="+msgToEncode+";encHash="+encHash+";replyHash="+replyHash); + // verify that replyHash is HMac of encHash + SecureShuffleUtils.verifyReply(replyHash, encHash, shuffleSecretKey); + LOG.info("for url="+msgToEncode+" sent hash and received reply"); + } + + private void setupShuffleConnection(String encHash) { + // put url hash into http header + connection.addRequestProperty( + SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash); + // set the read timeout + connection.setReadTimeout(readTimeout); + // put shuffle version into http header + connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, + ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, + ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + } private static TaskAttemptID[] EMPTY_ATTEMPT_ID_ARRAY = new TaskAttemptID[0]; private TaskAttemptID[] copyMapOutput(MapHost host, DataInputStream input, - Set remaining) { + Set remaining, + boolean canRetry) throws IOException { MapOutput mapOutput = null; TaskAttemptID mapId = null; long decompressedLength = -1; long compressedLength = -1; try { - long startTime = System.currentTimeMillis(); + long startTime = Time.monotonicNow(); int forReduce = -1; //Read the shuffle header try { @@ -449,7 +539,10 @@ class Fetcher extends Thread { } // Inform the shuffle scheduler - long endTime = System.currentTimeMillis(); + long endTime = Time.monotonicNow(); + // Reset retryStartTime as map task make progress if retried before. + retryStartTime = 0; + scheduler.copySucceeded(mapId, host, compressedLength, endTime - startTime, mapOutput); // Note successful shuffle @@ -457,9 +550,14 @@ class Fetcher extends Thread { metrics.successFetch(); return null; } catch (IOException ioe) { + + if (canRetry) { + checkTimeoutOrRetry(host, ioe); + } + ioErrs.increment(1); if (mapId == null || mapOutput == null) { - LOG.info("fetcher#" + id + " failed to read map header" + + LOG.warn("fetcher#" + id + " failed to read map header" + mapId + " decomp: " + decompressedLength + ", " + compressedLength, ioe); if(mapId == null) { @@ -468,7 +566,7 @@ class Fetcher extends Thread { return new TaskAttemptID[] {mapId}; } } - + LOG.warn("Failed to shuffle output of " + mapId + " from " + host.getHostName(), ioe); @@ -479,6 +577,29 @@ class Fetcher extends Thread { } } + + /** check if hit timeout of retry, if not, throw an exception and start a + * new round of retry.*/ + private void checkTimeoutOrRetry(MapHost host, IOException ioe) + throws IOException { + // First time to retry. + long currentTime = Time.monotonicNow(); + if (retryStartTime == 0) { + retryStartTime = currentTime; + } + + // Retry is not timeout, let's do retry with throwing an exception. + if (currentTime - retryStartTime < this.fetchRetryTimeout) { + LOG.warn("Shuffle output from " + host.getHostName() + + " failed, retry it."); + throw ioe; + } else { + // timeout, prepare to be failed. + LOG.warn("Timeout for copying MapOutput with retry on host " + host + + "after " + fetchRetryTimeout + "milliseconds."); + + } + } /** * Do some basic verification on the input received -- Being defensive @@ -525,7 +646,7 @@ class Fetcher extends Thread { * @return * @throws MalformedURLException */ - private URL getMapOutputURL(MapHost host, List maps + private URL getMapOutputURL(MapHost host, Collection maps ) throws MalformedURLException { // Get the base url StringBuffer url = new StringBuffer(host.getBaseUrl()); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java index 63f326632ef..e48a73a0c12 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java @@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.task.reduce.MapHost.State; import org.apache.hadoop.util.Progress; +import org.apache.hadoop.util.Time; @InterfaceAudience.Private @InterfaceStability.Unstable @@ -121,7 +122,7 @@ public class ShuffleSchedulerImpl implements ShuffleScheduler { this.shuffledMapsCounter = shuffledMapsCounter; this.reduceShuffleBytes = reduceShuffleBytes; this.failedShuffleCounter = failedShuffleCounter; - this.startTime = System.currentTimeMillis(); + this.startTime = Time.monotonicNow(); lastProgressTime = startTime; referee.start(); this.maxFailedUniqueFetches = Math.min(totalMaps, 5); @@ -198,7 +199,7 @@ public class ShuffleSchedulerImpl implements ShuffleScheduler { totalBytesShuffledTillNow += bytes; updateStatus(); reduceShuffleBytes.increment(bytes); - lastProgressTime = System.currentTimeMillis(); + lastProgressTime = Time.monotonicNow(); LOG.debug("map " + mapId + " done " + status.getStateString()); } } @@ -206,7 +207,7 @@ public class ShuffleSchedulerImpl implements ShuffleScheduler { private void updateStatus() { float mbs = (float) totalBytesShuffledTillNow / (1024 * 1024); int mapsDone = totalMaps - remainingMaps; - long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1; + long secsSinceStart = (Time.monotonicNow() - startTime) / 1000 + 1; float transferRate = mbs / secsSinceStart; progress.set((float) mapsDone / totalMaps); @@ -307,7 +308,7 @@ public class ShuffleSchedulerImpl implements ShuffleScheduler { // check if the reducer is stalled for a long time // duration for which the reducer is stalled int stallDuration = - (int)(System.currentTimeMillis() - lastProgressTime); + (int)(Time.monotonicNow() - lastProgressTime); // duration for which the reducer ran with progress int shuffleProgressDuration = @@ -389,7 +390,7 @@ public class ShuffleSchedulerImpl implements ShuffleScheduler { LOG.info("Assigning " + host + " with " + host.getNumKnownMapOutputs() + " to " + Thread.currentThread().getName()); - shuffleStart.set(System.currentTimeMillis()); + shuffleStart.set(Time.monotonicNow()); return host; } @@ -430,7 +431,7 @@ public class ShuffleSchedulerImpl implements ShuffleScheduler { } } LOG.info(host + " freed by " + Thread.currentThread().getName() + " in " + - (System.currentTimeMillis()-shuffleStart.get()) + "ms"); + (Time.monotonicNow()-shuffleStart.get()) + "ms"); } public synchronized void resetKnownMaps() { @@ -464,12 +465,12 @@ public class ShuffleSchedulerImpl implements ShuffleScheduler { Penalty(MapHost host, long delay) { this.host = host; - this.endTime = System.currentTimeMillis() + delay; + this.endTime = Time.monotonicNow() + delay; } @Override public long getDelay(TimeUnit unit) { - long remainingTime = endTime - System.currentTimeMillis(); + long remainingTime = endTime - Time.monotonicNow(); return unit.convert(remainingTime, TimeUnit.MILLISECONDS); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 6cefdc97b52..d1052c5b249 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -128,6 +128,27 @@ + + mapreduce.reduce.shuffle.fetch.retry.enabled + ${yarn.nodemanager.recovery.enabled} + Set to enable fetch retry during host restart. + + + + mapreduce.reduce.shuffle.fetch.retry.interval-ms + 1000 + Time of interval that fetcher retry to fetch again when some + non-fatal failure happens because of some events like NM restart. + + + + + mapreduce.reduce.shuffle.fetch.retry.timeout-ms + 30000 + Timeout value for fetcher to retry to fetch again when some + non-fatal failure happens because of some events like NM restart. + + mapreduce.reduce.shuffle.retry-delay.max.ms 60000 diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/HadoopArchives.md.vm b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/HadoopArchives.md.vm index db0a25f7e4f..be557a73293 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/HadoopArchives.md.vm +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/HadoopArchives.md.vm @@ -59,6 +59,11 @@ How to Create an Archive `hadoop archive -archiveName zoo.har -p /foo/bar -r 3 /outputdir` + If you specify source files that are in an encryption zone, they will be + decrypted and written into the archive. If the har file is not located in an + encryption zone, then they will be stored in clear (decrypted) form. If the + har file is located in an encryption zone they will stored in encrypted form. + How to Look Up Files in Archives -------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java index de594d405e3..127f8ae35b3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java @@ -63,12 +63,25 @@ public class TestTokenCache { @Test @SuppressWarnings("deprecation") - public void testBinaryCredentials() throws Exception { + public void testBinaryCredentialsWithoutScheme() throws Exception { + testBinaryCredentials(false); + } + + @Test + @SuppressWarnings("deprecation") + public void testBinaryCredentialsWithScheme() throws Exception { + testBinaryCredentials(true); + } + + private void testBinaryCredentials(boolean hasScheme) throws Exception { Path TEST_ROOT_DIR = new Path(System.getProperty("test.build.data","test/build/data")); // ick, but need fq path minus file:/ - String binaryTokenFile = FileSystem.getLocal(conf).makeQualified( - new Path(TEST_ROOT_DIR, "tokenFile")).toUri().getPath(); + String binaryTokenFile = hasScheme + ? FileSystem.getLocal(conf).makeQualified( + new Path(TEST_ROOT_DIR, "tokenFile")).toString() + : FileSystem.getLocal(conf).makeQualified( + new Path(TEST_ROOT_DIR, "tokenFile")).toUri().getPath(); MockFileSystem fs1 = createFileSystemForServiceName("service1"); MockFileSystem fs2 = createFileSystemForServiceName("service2"); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java index 3db382e4f44..7736c4854ff 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java @@ -27,6 +27,7 @@ import java.net.HttpURLConnection; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.MapOutputFile; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskID; import org.junit.After; @@ -60,6 +61,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.util.DiskChecker.DiskErrorException; +import org.apache.hadoop.util.Time; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -71,6 +73,7 @@ import org.mockito.stubbing.Answer; public class TestFetcher { private static final Log LOG = LogFactory.getLog(TestFetcher.class); JobConf job = null; + JobConf jobWithRetry = null; TaskAttemptID id = null; ShuffleSchedulerImpl ss = null; MergeManagerImpl mm = null; @@ -93,6 +96,9 @@ public class TestFetcher { public void setup() { LOG.info(">>>> " + name.getMethodName()); job = new JobConf(); + job.setBoolean(MRJobConfig.SHUFFLE_FETCH_RETRY_ENABLED, false); + jobWithRetry = new JobConf(); + jobWithRetry.setBoolean(MRJobConfig.SHUFFLE_FETCH_RETRY_ENABLED, true); id = TaskAttemptID.forName("attempt_0_1_r_1_1"); ss = mock(ShuffleSchedulerImpl.class); mm = mock(MergeManagerImpl.class); @@ -228,6 +234,38 @@ public class TestFetcher { verify(ss, times(3)).putBackKnownMapOutput(any(MapHost.class), eq(map1ID)); verify(ss, times(3)).putBackKnownMapOutput(any(MapHost.class), eq(map2ID)); } + + @Test + public void testCopyFromHostIncompatibleShuffleVersionWithRetry() + throws Exception { + String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key); + + when(connection.getResponseCode()).thenReturn(200); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME)) + .thenReturn("mapreduce").thenReturn("other").thenReturn("other"); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION)) + .thenReturn("1.0.1").thenReturn("1.0.0").thenReturn("1.0.1"); + when(connection.getHeaderField( + SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)).thenReturn(replyHash); + ByteArrayInputStream in = new ByteArrayInputStream(new byte[0]); + when(connection.getInputStream()).thenReturn(in); + + for (int i = 0; i < 3; ++i) { + Fetcher underTest = new FakeFetcher(jobWithRetry, + id, ss, mm, r, metrics, except, key, connection); + underTest.copyFromHost(host); + } + + verify(connection, times(3)).addRequestProperty( + SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash); + + verify(allErrs, times(3)).increment(1); + verify(ss, times(3)).copyFailed(map1ID, host, false, false); + verify(ss, times(3)).copyFailed(map2ID, host, false, false); + + verify(ss, times(3)).putBackKnownMapOutput(any(MapHost.class), eq(map1ID)); + verify(ss, times(3)).putBackKnownMapOutput(any(MapHost.class), eq(map2ID)); + } @Test public void testCopyFromHostWait() throws Exception { @@ -301,6 +339,48 @@ public class TestFetcher { encHash); verify(ss, times(1)).copyFailed(map1ID, host, true, false); } + + @SuppressWarnings("unchecked") + @Test(timeout=10000) + public void testCopyFromHostWithRetry() throws Exception { + InMemoryMapOutput immo = mock(InMemoryMapOutput.class); + ss = mock(ShuffleSchedulerImpl.class); + Fetcher underTest = new FakeFetcher(jobWithRetry, + id, ss, mm, r, metrics, except, key, connection, true); + + String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key); + + when(connection.getResponseCode()).thenReturn(200); + when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)) + .thenReturn(replyHash); + ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1); + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + header.write(new DataOutputStream(bout)); + ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray()); + when(connection.getInputStream()).thenReturn(in); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME)) + .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION)) + .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt())) + .thenReturn(immo); + + final long retryTime = Time.monotonicNow(); + doAnswer(new Answer() { + public Void answer(InvocationOnMock ignore) throws IOException { + // Emulate host down for 3 seconds. + if ((Time.monotonicNow() - retryTime) <= 3000) { + throw new java.lang.InternalError(); + } + return null; + } + }).when(immo).shuffle(any(MapHost.class), any(InputStream.class), anyLong(), + anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class)); + + underTest.copyFromHost(host); + verify(ss, never()).copyFailed(any(TaskAttemptID.class),any(MapHost.class), + anyBoolean(), anyBoolean()); + } @Test public void testCopyFromHostExtraBytes() throws Exception { @@ -447,6 +527,9 @@ public class TestFetcher { public static class FakeFetcher extends Fetcher { + // If connection need to be reopen. + private boolean renewConnection = false; + public FakeFetcher(JobConf job, TaskAttemptID reduceId, ShuffleSchedulerImpl scheduler, MergeManagerImpl merger, Reporter reporter, ShuffleClientMetrics metrics, @@ -456,6 +539,17 @@ public class TestFetcher { exceptionReporter, jobTokenSecret); this.connection = connection; } + + public FakeFetcher(JobConf job, TaskAttemptID reduceId, + ShuffleSchedulerImpl scheduler, MergeManagerImpl merger, + Reporter reporter, ShuffleClientMetrics metrics, + ExceptionReporter exceptionReporter, SecretKey jobTokenSecret, + HttpURLConnection connection, boolean renewConnection) { + super(job, reduceId, scheduler, merger, reporter, metrics, + exceptionReporter, jobTokenSecret); + this.connection = connection; + this.renewConnection = renewConnection; + } public FakeFetcher(JobConf job, TaskAttemptID reduceId, ShuffleSchedulerImpl scheduler, MergeManagerImpl merger, @@ -469,7 +563,7 @@ public class TestFetcher { @Override protected void openConnection(URL url) throws IOException { - if (null == connection) { + if (null == connection || renewConnection) { super.openConnection(url); } // already 'opened' the mocked connection diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/client/HSAdmin.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/client/HSAdmin.java index be6ca131819..000ea54618b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/client/HSAdmin.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/client/HSAdmin.java @@ -25,6 +25,7 @@ import java.util.Arrays; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.v2.hs.HSProxies; import org.apache.hadoop.mapreduce.v2.hs.protocol.HSAdminRefreshProtocol; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; @@ -41,7 +42,7 @@ public class HSAdmin extends Configured implements Tool { super(); } - public HSAdmin(Configuration conf) { + public HSAdmin(JobConf conf) { super(conf); } @@ -331,7 +332,8 @@ public class HSAdmin extends Configured implements Tool { } public static void main(String[] args) throws Exception { - int result = ToolRunner.run(new HSAdmin(), args); + JobConf conf = new JobConf(); + int result = ToolRunner.run(new HSAdmin(conf), args); System.exit(result); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/server/TestHSAdminServer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/server/TestHSAdminServer.java index 277a1953c8e..2c239ec2bb8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/server/TestHSAdminServer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/server/TestHSAdminServer.java @@ -28,6 +28,7 @@ import java.util.List; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.v2.hs.JobHistory; import org.apache.hadoop.mapreduce.v2.hs.client.HSAdmin; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; @@ -48,7 +49,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogDeletionService; public class TestHSAdminServer { private HSAdminServer hsAdminServer = null; private HSAdmin hsAdminClient = null; - Configuration conf = null; + JobConf conf = null; private static long groupRefreshTimeoutSec = 1; JobHistory jobHistoryService = null; AggregatedLogDeletionService alds = null; @@ -81,7 +82,7 @@ public class TestHSAdminServer { @Before public void init() throws HadoopIllegalArgumentException, IOException { - conf = new Configuration(); + conf = new JobConf(); conf.set(JHAdminConfig.JHS_ADMIN_ADDRESS, "0.0.0.0:0"); conf.setClass("hadoop.security.group.mapping", MockUnixGroupsMapping.class, GroupMappingServiceProvider.class); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestBinaryTokenFile.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestBinaryTokenFile.java index b92400dbf6a..7a2c03b1b0b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestBinaryTokenFile.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestBinaryTokenFile.java @@ -150,30 +150,15 @@ public class TestBinaryTokenFile { // Credentials in the job will not have delegation tokens // because security is disabled. Fetch delegation tokens // and store in binary token file. - try { - Credentials cred1 = new Credentials(); - Credentials cred2 = new Credentials(); - TokenCache.obtainTokensForNamenodesInternal(cred1, new Path[] { p1 }, - job.getConfiguration()); - for (Token t : cred1.getAllTokens()) { - cred2.addToken(new Text(DELEGATION_TOKEN_KEY), t); - } - DataOutputStream os = new DataOutputStream(new FileOutputStream( - binaryTokenFileName.toString())); - try { - cred2.writeTokenStorageToStream(os); - } finally { - os.close(); - } - job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, - binaryTokenFileName.toString()); - // NB: the MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY key now gets deleted from config, - // so it's not accessible in the job's config. So, we use another key to pass the file name into the job configuration: - job.getConfiguration().set(KEY_SECURITY_TOKEN_FILE_NAME, - binaryTokenFileName.toString()); - } catch (IOException e) { - Assert.fail("Exception " + e); - } + createBinaryTokenFile(job.getConfiguration()); + job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, + binaryTokenFileName.toString()); + // NB: the MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY + // key now gets deleted from config, + // so it's not accessible in the job's config. So, + // we use another key to pass the file name into the job configuration: + job.getConfiguration().set(KEY_SECURITY_TOKEN_FILE_NAME, + binaryTokenFileName.toString()); } } @@ -225,7 +210,29 @@ public class TestBinaryTokenFile { dfsCluster = null; } } - + + private static void createBinaryTokenFile(Configuration conf) { + // Fetch delegation tokens and store in binary token file. + try { + Credentials cred1 = new Credentials(); + Credentials cred2 = new Credentials(); + TokenCache.obtainTokensForNamenodesInternal(cred1, new Path[] { p1 }, + conf); + for (Token t : cred1.getAllTokens()) { + cred2.addToken(new Text(DELEGATION_TOKEN_KEY), t); + } + DataOutputStream os = new DataOutputStream(new FileOutputStream( + binaryTokenFileName.toString())); + try { + cred2.writeTokenStorageToStream(os); + } finally { + os.close(); + } + } catch (IOException e) { + Assert.fail("Exception " + e); + } + } + /** * run a distributed job and verify that TokenCache is available * @throws IOException @@ -252,4 +259,33 @@ public class TestBinaryTokenFile { } assertEquals("dist job res is not 0:", 0, res); } + + /** + * run a distributed job with -tokenCacheFile option parameter and + * verify that no exception happens. + * @throws IOException + */ + @Test + public void testTokenCacheFile() throws IOException { + Configuration conf = mrCluster.getConfig(); + createBinaryTokenFile(conf); + // provide namenodes names for the job to get the delegation tokens for + final String nnUri = dfsCluster.getURI(0).toString(); + conf.set(MRJobConfig.JOB_NAMENODES, nnUri + "," + nnUri); + + // using argument to pass the file name + final String[] args = { + "-tokenCacheFile", binaryTokenFileName.toString(), + "-m", "1", "-r", "1", "-mt", "1", "-rt", "1" + }; + int res = -1; + try { + res = ToolRunner.run(conf, new SleepJob(), args); + } catch (Exception e) { + System.out.println("Job failed with " + e.getLocalizedMessage()); + e.printStackTrace(System.out); + fail("Job failed"); + } + assertEquals("dist job res is not 0:", 0, res); + } } diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index c179c7fcd94..759f2bfee7c 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -389,6 +389,14 @@ Release 2.6.0 - UNRELEASED YARN-1779. Fixed AMRMClient to handle AMRMTokens correctly across ResourceManager work-preserving-restart or failover. (Jian He via vinodkv) + YARN-2363. Submitted applications occasionally lack a tracking URL (jlowe) + + YARN-2561. MR job client cannot reconnect to AM after NM restart. (Junping + Du via jlowe) + + YARN-2563. Fixed YarnClient to call getTimeLineDelegationToken only if the + Token is not present. (Zhijie Shen via jianhe) + Release 2.5.1 - 2014-09-05 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index f1a3b6eecea..def6da55ea8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -110,7 +110,8 @@ public class YarnClientImpl extends YarnClient { private AHSClient historyClient; private boolean historyServiceEnabled; protected TimelineClient timelineClient; - protected Text timelineService; + @VisibleForTesting + Text timelineService; protected boolean timelineServiceEnabled; private static final String ROOT = "root"; @@ -272,12 +273,6 @@ public class YarnClientImpl extends YarnClient { private void addTimelineDelegationToken( ContainerLaunchContext clc) throws YarnException, IOException { - org.apache.hadoop.security.token.Token timelineDelegationToken = - timelineClient.getDelegationToken( - UserGroupInformation.getCurrentUser().getUserName()); - if (timelineDelegationToken == null) { - return; - } Credentials credentials = new Credentials(); DataInputByteBuffer dibb = new DataInputByteBuffer(); ByteBuffer tokens = clc.getTokens(); @@ -290,11 +285,15 @@ public class YarnClientImpl extends YarnClient { // one more for (org.apache.hadoop.security.token.Token token : credentials .getAllTokens()) { - TokenIdentifier tokenIdentifier = token.decodeIdentifier(); - if (tokenIdentifier instanceof TimelineDelegationTokenIdentifier) { + if (token.getKind().equals(TimelineDelegationTokenIdentifier.KIND_NAME)) { return; } } + org.apache.hadoop.security.token.Token + timelineDelegationToken = getTimelineDelegationToken(); + if (timelineDelegationToken == null) { + return; + } credentials.addToken(timelineService, timelineDelegationToken); if (LOG.isDebugEnabled()) { LOG.debug("Add timline delegation token into credentials: " @@ -306,6 +305,13 @@ public class YarnClientImpl extends YarnClient { clc.setTokens(tokens); } + @VisibleForTesting + org.apache.hadoop.security.token.Token + getTimelineDelegationToken() throws IOException, YarnException { + return timelineClient.getDelegationToken( + UserGroupInformation.getCurrentUser().getUserName()); + } + @Private @VisibleForTesting protected boolean isSecurityEnabled() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index 8259893af37..3c1b1c19908 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.client.api.impl; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -39,6 +41,7 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -745,10 +748,13 @@ public class TestYarnClient { Configuration conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf); + TimelineDelegationTokenIdentifier timelineDT = + new TimelineDelegationTokenIdentifier(); final Token dToken = - new Token(); + new Token( + timelineDT.getBytes(), new byte[0], timelineDT.getKind(), new Text()); // crate a mock client - YarnClientImpl client = new YarnClientImpl() { + YarnClientImpl client = spy(new YarnClientImpl() { @Override protected void serviceInit(Configuration conf) throws Exception { if (getConfig().getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, @@ -784,34 +790,48 @@ public class TestYarnClient { public boolean isSecurityEnabled() { return true; } - }; + }); client.init(conf); client.start(); - ApplicationSubmissionContext context = - mock(ApplicationSubmissionContext.class); - ApplicationId applicationId = ApplicationId.newInstance(0, 1); - when(context.getApplicationId()).thenReturn(applicationId); - DataOutputBuffer dob = new DataOutputBuffer(); - Credentials credentials = new Credentials(); - credentials.writeTokenStorageToStream(dob); - ByteBuffer tokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); - ContainerLaunchContext clc = ContainerLaunchContext.newInstance( - null, null, null, null, tokens, null); - when(context.getAMContainerSpec()).thenReturn(clc); - client.submitApplication(context); - // Check whether token is added or not - credentials = new Credentials(); - DataInputByteBuffer dibb = new DataInputByteBuffer(); - tokens = clc.getTokens(); - if (tokens != null) { - dibb.reset(tokens); - credentials.readTokenStorageStream(dibb); - tokens.rewind(); + try { + // when i == 0, timeline DT already exists, no need to get one more + // when i == 1, timeline DT doesn't exist, need to get one more + for (int i = 0; i < 2; ++i) { + ApplicationSubmissionContext context = + mock(ApplicationSubmissionContext.class); + ApplicationId applicationId = ApplicationId.newInstance(0, i + 1); + when(context.getApplicationId()).thenReturn(applicationId); + DataOutputBuffer dob = new DataOutputBuffer(); + Credentials credentials = new Credentials(); + if (i == 0) { + credentials.addToken(client.timelineService, dToken); + } + credentials.writeTokenStorageToStream(dob); + ByteBuffer tokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + ContainerLaunchContext clc = ContainerLaunchContext.newInstance( + null, null, null, null, tokens, null); + when(context.getAMContainerSpec()).thenReturn(clc); + client.submitApplication(context); + if (i == 0) { + // GetTimelineDelegationToken shouldn't be called + verify(client, never()).getTimelineDelegationToken(); + } + // In either way, token should be there + credentials = new Credentials(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + tokens = clc.getTokens(); + if (tokens != null) { + dibb.reset(tokens); + credentials.readTokenStorageStream(dibb); + tokens.rewind(); + } + Collection> dTokens = + credentials.getAllTokens(); + Assert.assertEquals(1, dTokens.size()); + Assert.assertEquals(dToken, dTokens.iterator().next()); + } + } finally { + client.stop(); } - Collection> dTokens = - credentials.getAllTokens(); - Assert.assertEquals(1, dTokens.size()); - Assert.assertEquals(dToken, dTokens.iterator().next()); - client.stop(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 815b86aa879..ff520be1275 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; +import java.net.URI; +import java.net.URISyntaxException; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; @@ -76,6 +78,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition; @@ -84,6 +87,7 @@ import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import com.google.common.annotations.VisibleForTesting; @@ -542,6 +546,7 @@ public class RMAppImpl implements RMApp, Recoverable { float progress = 0.0f; org.apache.hadoop.yarn.api.records.Token amrmToken = null; if (allowAccess) { + trackingUrl = getDefaultProxyTrackingUrl(); if (this.currentAttempt != null) { currentApplicationAttemptId = this.currentAttempt.getAppAttemptId(); trackingUrl = this.currentAttempt.getTrackingUrl(); @@ -602,6 +607,20 @@ public class RMAppImpl implements RMApp, Recoverable { } } + private String getDefaultProxyTrackingUrl() { + try { + final String scheme = WebAppUtils.getHttpSchemePrefix(conf); + String proxy = WebAppUtils.getProxyHostAndPort(conf); + URI proxyUri = ProxyUriUtils.getUriFromAMUrl(scheme, proxy); + URI result = ProxyUriUtils.getProxyUri(null, proxyUri, applicationId); + return result.toASCIIString(); + } catch (URISyntaxException e) { + LOG.warn("Could not generate default proxy tracking URL for " + + applicationId); + return UNAVAILABLE; + } + } + @Override public long getFinishTime() { this.readLock.lock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 3ce641662cc..12659587e5d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -544,12 +544,47 @@ public class RMNodeImpl implements RMNode, EventHandler { RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event; RMNode newNode = reconnectEvent.getReconnectedNode(); rmNode.nodeManagerVersion = newNode.getNodeManagerVersion(); - rmNode.httpPort = newNode.getHttpPort(); - rmNode.httpAddress = newNode.getHttpAddress(); - rmNode.totalCapability = newNode.getTotalCapability(); + List runningApps = reconnectEvent.getRunningApplications(); + boolean noRunningApps = + (runningApps == null) || (runningApps.size() == 0); - // Reset heartbeat ID since node just restarted. - rmNode.getLastNodeHeartBeatResponse().setResponseId(0); + // No application running on the node, so send node-removal event with + // cleaning up old container info. + if (noRunningApps) { + rmNode.nodeUpdateQueue.clear(); + rmNode.context.getDispatcher().getEventHandler().handle( + new NodeRemovedSchedulerEvent(rmNode)); + + if (rmNode.getHttpPort() == newNode.getHttpPort()) { + // Reset heartbeat ID since node just restarted. + rmNode.getLastNodeHeartBeatResponse().setResponseId(0); + if (rmNode.getState() != NodeState.UNHEALTHY) { + // Only add new node if old state is not UNHEALTHY + rmNode.context.getDispatcher().getEventHandler().handle( + new NodeAddedSchedulerEvent(newNode)); + } + } else { + // Reconnected node differs, so replace old node and start new node + switch (rmNode.getState()) { + case RUNNING: + ClusterMetrics.getMetrics().decrNumActiveNodes(); + break; + case UNHEALTHY: + ClusterMetrics.getMetrics().decrNumUnhealthyNMs(); + break; + } + rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode); + rmNode.context.getDispatcher().getEventHandler().handle( + new RMNodeStartedEvent(newNode.getNodeID(), null, null)); + } + } else { + rmNode.httpPort = newNode.getHttpPort(); + rmNode.httpAddress = newNode.getHttpAddress(); + rmNode.totalCapability = newNode.getTotalCapability(); + + // Reset heartbeat ID since node just restarted. + rmNode.getLastNodeHeartBeatResponse().setResponseId(0); + } if (null != reconnectEvent.getRunningApplications()) { for (ApplicationId appId : reconnectEvent.getRunningApplications()) { @@ -564,7 +599,7 @@ public class RMNodeImpl implements RMNode, EventHandler { // Update scheduler node's capacity for reconnect node. rmNode.context.getDispatcher().getEventHandler().handle( new NodeResourceUpdateSchedulerEvent(rmNode, - ResourceOption.newInstance(rmNode.totalCapability, -1))); + ResourceOption.newInstance(newNode.getTotalCapability(), -1))); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 48276205bf9..115f0b40a35 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.verify; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -599,6 +600,16 @@ public class TestResourceTrackerService { dispatcher.await(); Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); Assert.assertEquals(5120 + 10240, metrics.getAvailableMB()); + + // reconnect of node with changed capability and running applications + List runningApps = new ArrayList(); + runningApps.add(ApplicationId.newInstance(1, 0)); + nm1 = rm.registerNode("host2:5678", 15360, 2, runningApps); + dispatcher.await(); + response = nm1.nodeHeartbeat(true); + dispatcher.await(); + Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); + Assert.assertEquals(5120 + 15360, metrics.getAvailableMB()); } private void writeToHostsFile(String... hosts) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 08749688b64..457f21e061a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -32,8 +32,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Map; -import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -75,6 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretMan import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -961,6 +960,9 @@ public class TestRMAppTransitions { Assert.assertEquals(report.getApplicationResourceUsageReport(),RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT); report = app.createAndGetApplicationReport("clientuser", true); Assert.assertNotNull(report.getApplicationResourceUsageReport()); + Assert.assertTrue("bad proxy url for app", + report.getTrackingUrl().endsWith("/proxy/" + app.getApplicationId() + + "/")); } private void verifyApplicationFinished(RMAppState state) {