From 14b1cf459b574f65f77bee1b1adf22fa03d66ace Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 28 Aug 2012 02:05:25 +0000 Subject: [PATCH] MAPREDUCE-4579. Split TestTaskAttempt into two so as to pass tests on jdk7. Contributed by Thomas Graves. svn merge --ignore-ancestry -c 1377943 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1377944 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../v2/app/job/impl/TestTaskAttempt.java | 119 ++------------ .../impl/TestTaskAttemptContainerRequest.java | 154 ++++++++++++++++++ 3 files changed, 173 insertions(+), 103 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index d8f1e0bc556..de157e353a5 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -99,6 +99,9 @@ Release 2.1.0-alpha - Unreleased MAPREDUCE-4580. Change MapReduce to use the yarn-client module. (Vinod Kumar Vavilapalli via sseth) + MAPREDUCE-4579. Split TestTaskAttempt into two so as to pass tests on + jdk7. (Thomas Graves via vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index 3eff0c1cfe2..88e32b337ca 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -28,7 +28,6 @@ import java.io.IOException; import java.net.InetSocketAddress; -import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -36,23 +35,17 @@ import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; -import org.apache.hadoop.io.DataInputByteBuffer; -import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapTaskAttemptImpl; -import org.apache.hadoop.mapred.WrappedJvmID; import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion; -import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; @@ -79,18 +72,14 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.ClusterInfo; import org.apache.hadoop.yarn.SystemClock; -import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.Event; @@ -101,82 +90,6 @@ @SuppressWarnings({"unchecked", "rawtypes"}) public class TestTaskAttempt{ - @Test - public void testAttemptContainerRequest() throws Exception { - //WARNING: This test must run first. This is because there is an - // optimization where the credentials passed in are cached statically so - // they do not need to be recomputed when creating a new - // ContainerLaunchContext. if other tests run first this code will cache - // their credentials and this test will fail trying to look for the - // credentials it inserted in. - final Text SECRET_KEY_ALIAS = new Text("secretkeyalias"); - final byte[] SECRET_KEY = ("secretkey").getBytes(); - Map acls = - new HashMap(1); - acls.put(ApplicationAccessType.VIEW_APP, "otheruser"); - ApplicationId appId = BuilderUtils.newApplicationId(1, 1); - JobId jobId = MRBuilderUtils.newJobId(appId, 1); - TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); - Path jobFile = mock(Path.class); - - EventHandler eventHandler = mock(EventHandler.class); - TaskAttemptListener taListener = mock(TaskAttemptListener.class); - when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0)); - - JobConf jobConf = new JobConf(); - jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); - jobConf.setBoolean("fs.file.impl.disable.cache", true); - jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); - - // setup UGI for security so tokens and keys are preserved - jobConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); - UserGroupInformation.setConfiguration(jobConf); - - Credentials credentials = new Credentials(); - credentials.addSecretKey(SECRET_KEY_ALIAS, SECRET_KEY); - Token jobToken = new Token( - ("tokenid").getBytes(), ("tokenpw").getBytes(), - new Text("tokenkind"), new Text("tokenservice")); - - TaskAttemptImpl taImpl = - new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, - mock(TaskSplitMetaInfo.class), jobConf, taListener, - mock(OutputCommitter.class), jobToken, credentials, - new SystemClock(), null); - - jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString()); - ContainerId containerId = BuilderUtils.newContainerId(1, 1, 1, 1); - - ContainerLaunchContext launchCtx = - TaskAttemptImpl.createContainerLaunchContext(acls, containerId, - jobConf, jobToken, taImpl.createRemoteTask(), - TypeConverter.fromYarn(jobId), mock(Resource.class), - mock(WrappedJvmID.class), taListener, - credentials); - - Assert.assertEquals("ACLs mismatch", acls, launchCtx.getApplicationACLs()); - Credentials launchCredentials = new Credentials(); - - DataInputByteBuffer dibb = new DataInputByteBuffer(); - dibb.reset(launchCtx.getContainerTokens()); - launchCredentials.readTokenStorageStream(dibb); - - // verify all tokens specified for the task attempt are in the launch context - for (Token token : credentials.getAllTokens()) { - Token launchToken = - launchCredentials.getToken(token.getService()); - Assert.assertNotNull("Token " + token.getService() + " is missing", - launchToken); - Assert.assertEquals("Token " + token.getService() + " mismatch", - token, launchToken); - } - - // verify the secret key is in the launch context - Assert.assertNotNull("Secret key missing", - launchCredentials.getSecretKey(SECRET_KEY_ALIAS)); - Assert.assertTrue("Secret key mismatch", Arrays.equals(SECRET_KEY, - launchCredentials.getSecretKey(SECRET_KEY_ALIAS))); - } static public class StubbedFS extends RawLocalFileSystem { @Override @@ -227,7 +140,7 @@ public void testSingleRackRequest() throws Exception { //Only a single occurrence of /DefaultRack assertEquals(1, requestedRacks.length); } - + @Test public void testHostResolveAttempt() throws Exception { TaskAttemptImpl.RequestContainerTransition rct = @@ -266,7 +179,7 @@ public void testHostResolveAttempt() throws Exception { } assertEquals(0, expected.size()); } - + @Test public void testSlotMillisCounterUpdate() throws Exception { verifySlotMillis(2048, 2048, 1024); @@ -325,13 +238,13 @@ public void verifySlotMillis(int mapMemMb, int reduceMemMb, .getAllCounters().findCounter(JobCounter.SLOTS_MILLIS_REDUCES) .getValue()); } - + private TaskAttemptImpl createMapTaskAttemptImplForTest( EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) { Clock clock = new SystemClock(); return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo, clock); } - + private TaskAttemptImpl createMapTaskAttemptImplForTest( EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) { ApplicationId appId = BuilderUtils.newApplicationId(1, 1); @@ -402,30 +315,30 @@ public void handle(JobHistoryEvent event) { }; } } - + @Test public void testLaunchFailedWhileKilling() throws Exception { ApplicationId appId = BuilderUtils.newApplicationId(1, 2); - ApplicationAttemptId appAttemptId = + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 0); JobId jobId = MRBuilderUtils.newJobId(appId, 1); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); Path jobFile = mock(Path.class); - + MockEventHandler eventHandler = new MockEventHandler(); TaskAttemptListener taListener = mock(TaskAttemptListener.class); when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0)); - + JobConf jobConf = new JobConf(); jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); jobConf.setBoolean("fs.file.impl.disable.cache", true); jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); - + TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"}); - + TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, splits, jobConf, taListener, @@ -437,7 +350,7 @@ public void testLaunchFailedWhileKilling() throws Exception { Container container = mock(Container.class); when(container.getId()).thenReturn(contId); when(container.getNodeId()).thenReturn(nid); - + taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_SCHEDULE)); taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, @@ -450,7 +363,7 @@ public void testLaunchFailedWhileKilling() throws Exception { TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED)); assertFalse(eventHandler.internalError); } - + @Test public void testContainerCleanedWhileRunning() throws Exception { ApplicationId appId = BuilderUtils.newApplicationId(1, 2); @@ -565,7 +478,7 @@ public void testContainerCleanedWhileCommitting() throws Exception { assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED", eventHandler.internalError); } - + @Test public void testDoubleTooManyFetchFailure() throws Exception { ApplicationId appId = BuilderUtils.newApplicationId(1, 2); @@ -618,7 +531,7 @@ public void testDoubleTooManyFetchFailure() throws Exception { TaskAttemptEventType.TA_DONE)); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_CONTAINER_CLEANED)); - + assertEquals("Task attempt is not in succeeded state", taImpl.getState(), TaskAttemptState.SUCCEEDED); taImpl.handle(new TaskAttemptEvent(attemptId, @@ -635,7 +548,7 @@ public void testDoubleTooManyFetchFailure() throws Exception { public static class MockEventHandler implements EventHandler { public boolean internalError; - + @Override public void handle(Event event) { if (event instanceof JobEvent) { @@ -645,6 +558,6 @@ public void handle(Event event) { } } } - + }; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java new file mode 100644 index 00000000000..fbd23e15553 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.app.job.impl; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import junit.framework.Assert; + +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapTaskAttemptImpl; +import org.apache.hadoop.mapred.WrappedJvmID; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; +import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskType; +import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.SystemClock; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.junit.Test; + +@SuppressWarnings({"rawtypes"}) +public class TestTaskAttemptContainerRequest { + + //WARNING: This test must be the only test in this file. This is because + // there is an optimization where the credentials passed in are cached + // statically so they do not need to be recomputed when creating a new + // ContainerLaunchContext. if other tests run first this code will cache + // their credentials and this test will fail trying to look for the + // credentials it inserted in. + + @Test + public void testAttemptContainerRequest() throws Exception { + final Text SECRET_KEY_ALIAS = new Text("secretkeyalias"); + final byte[] SECRET_KEY = ("secretkey").getBytes(); + Map acls = + new HashMap(1); + acls.put(ApplicationAccessType.VIEW_APP, "otheruser"); + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); + Path jobFile = mock(Path.class); + + EventHandler eventHandler = mock(EventHandler.class); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0)); + + JobConf jobConf = new JobConf(); + jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + jobConf.setBoolean("fs.file.impl.disable.cache", true); + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); + + // setup UGI for security so tokens and keys are preserved + jobConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + UserGroupInformation.setConfiguration(jobConf); + + Credentials credentials = new Credentials(); + credentials.addSecretKey(SECRET_KEY_ALIAS, SECRET_KEY); + Token jobToken = new Token( + ("tokenid").getBytes(), ("tokenpw").getBytes(), + new Text("tokenkind"), new Text("tokenservice")); + + TaskAttemptImpl taImpl = + new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, + mock(TaskSplitMetaInfo.class), jobConf, taListener, + mock(OutputCommitter.class), jobToken, credentials, + new SystemClock(), null); + + jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString()); + ContainerId containerId = BuilderUtils.newContainerId(1, 1, 1, 1); + + ContainerLaunchContext launchCtx = + TaskAttemptImpl.createContainerLaunchContext(acls, containerId, + jobConf, jobToken, taImpl.createRemoteTask(), + TypeConverter.fromYarn(jobId), mock(Resource.class), + mock(WrappedJvmID.class), taListener, + credentials); + + Assert.assertEquals("ACLs mismatch", acls, launchCtx.getApplicationACLs()); + Credentials launchCredentials = new Credentials(); + + DataInputByteBuffer dibb = new DataInputByteBuffer(); + dibb.reset(launchCtx.getContainerTokens()); + launchCredentials.readTokenStorageStream(dibb); + + // verify all tokens specified for the task attempt are in the launch context + for (Token token : credentials.getAllTokens()) { + Token launchToken = + launchCredentials.getToken(token.getService()); + Assert.assertNotNull("Token " + token.getService() + " is missing", + launchToken); + Assert.assertEquals("Token " + token.getService() + " mismatch", + token, launchToken); + } + + // verify the secret key is in the launch context + Assert.assertNotNull("Secret key missing", + launchCredentials.getSecretKey(SECRET_KEY_ALIAS)); + Assert.assertTrue("Secret key mismatch", Arrays.equals(SECRET_KEY, + launchCredentials.getSecretKey(SECRET_KEY_ALIAS))); + } + + static public class StubbedFS extends RawLocalFileSystem { + @Override + public FileStatus getFileStatus(Path f) throws IOException { + return new FileStatus(1, false, 1, 1, 1, f); + } + } + +}