diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 99382adee83..c3190c8bb8f 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -552,6 +552,9 @@ Release 2.1.0-beta - UNRELEASED MAPREDUCE-4019. -list-attempt-ids is not working (Ashwin Shankar, Devaraj K, and B Anil Kumar via jlowe) + MAPREDUCE-5334. Fix failing unit tests - TestContainerLauncher, + TestContainerLauncherImpl. (Vinod Kumar Vavilapalli via sseth) + BREAKDOWN OF HADOOP-8562 SUBTASKS MAPREDUCE-4739. Some MapReduce tests fail to find winutils. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java index 94171811c9a..b2732c638f8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java @@ -41,12 +41,8 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunched import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; @@ -55,9 +51,6 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy; import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -393,8 +386,9 @@ public class ContainerLauncherImpl extends AbstractService implements } } - public ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData getCMProxy( - String containerMgrBindAddr, ContainerId containerId) throws IOException { + public ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData + getCMProxy(String containerMgrBindAddr, ContainerId containerId) + throws IOException { return cmProxy.getProxy(containerMgrBindAddr, containerId); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java index 6ea0f67ec80..0033490f136 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.mock; import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; @@ -32,6 +33,7 @@ import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.JobId; @@ -61,6 +63,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy; import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData; @@ -70,7 +73,9 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; -import org.apache.hadoop.yarn.security.NMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; +import org.apache.hadoop.yarn.util.Records; import org.junit.Test; public class TestContainerLauncher { @@ -82,7 +87,7 @@ public class TestContainerLauncher { static final Log LOG = LogFactory.getLog(TestContainerLauncher.class); - @Test + @Test (timeout = 5000) public void testPoolSize() throws InterruptedException { ApplicationId appId = ApplicationId.newInstance(12345, 67); @@ -158,7 +163,7 @@ public class TestContainerLauncher { containerLauncher.stop(); } - @Test + @Test(timeout = 5000) public void testPoolLimits() throws InterruptedException { ApplicationId appId = ApplicationId.newInstance(12345, 67); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( @@ -222,7 +227,7 @@ public class TestContainerLauncher { containerLauncher.numEventsProcessing.get()); } - @Test + @Test(timeout = 15000) public void testSlowNM() throws Exception { conf = new Configuration(); @@ -235,11 +240,19 @@ public class TestContainerLauncher { YarnRPC rpc = YarnRPC.create(conf); String bindAddr = "localhost:0"; InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr); - server = rpc.getServer(ContainerManagementProtocol.class, new DummyContainerManager(), - addr, conf, null, 1); + NMTokenSecretManagerInNM tokenSecretManager = + new NMTokenSecretManagerInNM(); + MasterKey masterKey = Records.newRecord(MasterKey.class); + masterKey.setBytes(ByteBuffer.wrap("key".getBytes())); + tokenSecretManager.setMasterKey(masterKey); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "token"); + server = + rpc.getServer(ContainerManagementProtocol.class, + new DummyContainerManager(), addr, conf, tokenSecretManager, 1); server.start(); - MRApp app = new MRAppWithSlowNM(); + MRApp app = new MRAppWithSlowNM(tokenSecretManager); try { Job job = app.submit(conf); @@ -340,8 +353,10 @@ public class TestContainerLauncher { private class MRAppWithSlowNM extends MRApp { - public MRAppWithSlowNM() { + private NMTokenSecretManagerInNM tokenSecretManager; + public MRAppWithSlowNM(NMTokenSecretManagerInNM tokenSecretManager) { super(1, 0, false, "TestContainerLauncher", true); + this.tokenSecretManager = tokenSecretManager; } @Override @@ -353,18 +368,19 @@ public class TestContainerLauncher { public ContainerManagementProtocolProxyData getCMProxy( String containerMgrBindAddr, ContainerId containerId) throws IOException { - Token dummyToken = - Token.newInstance("NMTokenIdentifier".getBytes(), - NMTokenIdentifier.KIND.toString(), "password".getBytes(), - "NMToken"); + InetSocketAddress addr = NetUtils.getConnectAddress(server); + String containerManagerBindAddr = + addr.getHostName() + ":" + addr.getPort(); + Token token = + tokenSecretManager.createNMToken( + containerId.getApplicationAttemptId(), + NodeId.newInstance(addr.getHostName(), addr.getPort()), "user"); ContainerManagementProtocolProxy cmProxy = new ContainerManagementProtocolProxy(conf, context.getNMTokens()); - InetSocketAddress addr = NetUtils.getConnectAddress(server); ContainerManagementProtocolProxyData proxy = cmProxy.new ContainerManagementProtocolProxyData( - YarnRPC.create(conf), - addr.getHostName() + ":" + addr.getPort(), containerId, - dummyToken); + YarnRPC.create(conf), containerManagerBindAddr, containerId, + token); return proxy; } }; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java index cc12ce3bade..dc6ca6a7949 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java @@ -18,7 +18,6 @@ package org.apache.hadoop.mapreduce.v2.app.launcher; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -26,7 +25,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; -import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; @@ -59,12 +57,12 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.junit.Before; import org.junit.Test; @@ -88,13 +86,25 @@ public class TestContainerLauncherImpl { private static class ContainerLauncherImplUnderTest extends ContainerLauncherImpl { - private YarnRPC rpc; - - public ContainerLauncherImplUnderTest(AppContext context, YarnRPC rpc) { + private ContainerManagementProtocol containerManager; + + public ContainerLauncherImplUnderTest(AppContext context, + ContainerManagementProtocol containerManager) { super(context); - this.rpc = rpc; + this.containerManager = containerManager; } - + + @Override + public ContainerManagementProtocolProxyData getCMProxy( + String containerMgrBindAddr, ContainerId containerId) + throws IOException { + ContainerManagementProtocolProxyData protocolProxy = + mock(ContainerManagementProtocolProxyData.class); + when(protocolProxy.getContainerManagementProtocol()).thenReturn( + containerManager); + return protocolProxy; + } + public void waitForPoolToIdle() throws InterruptedException { //I wish that we did not need the sleep, but it is here so that we are sure // That the other thread had time to insert the event into the queue and @@ -133,22 +143,18 @@ public class TestContainerLauncherImpl { return MRBuilderUtils.newTaskAttemptId(tID, id); } - @Test + @Test(timeout = 5000) public void testHandle() throws Exception { LOG.info("STARTING testHandle"); - YarnRPC mockRpc = mock(YarnRPC.class); AppContext mockContext = mock(AppContext.class); @SuppressWarnings("rawtypes") EventHandler mockEventHandler = mock(EventHandler.class); when(mockContext.getEventHandler()).thenReturn(mockEventHandler); - - ContainerManagementProtocol mockCM = mock(ContainerManagementProtocol.class); - when(mockRpc.getProxy(eq(ContainerManagementProtocol.class), - any(InetSocketAddress.class), any(Configuration.class))) - .thenReturn(mockCM); - - ContainerLauncherImplUnderTest ut = - new ContainerLauncherImplUnderTest(mockContext, mockRpc); + String cmAddress = "127.0.0.1:8000"; + ContainerManagementProtocol mockCM = + mock(ContainerManagementProtocol.class); + ContainerLauncherImplUnderTest ut = + new ContainerLauncherImplUnderTest(mockContext, mockCM); Configuration conf = new Configuration(); ut.init(conf); @@ -156,7 +162,6 @@ public class TestContainerLauncherImpl { try { ContainerId contId = makeContainerId(0l, 0, 0, 1); TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0); - String cmAddress = "127.0.0.1:8000"; StartContainerResponse startResp = recordFactory.newRecordInstance(StartContainerResponse.class); startResp.setAllServicesMetaData(serviceResponse); @@ -199,22 +204,18 @@ public class TestContainerLauncherImpl { } } - @Test + @Test(timeout = 5000) public void testOutOfOrder() throws Exception { LOG.info("STARTING testOutOfOrder"); - YarnRPC mockRpc = mock(YarnRPC.class); AppContext mockContext = mock(AppContext.class); @SuppressWarnings("rawtypes") EventHandler mockEventHandler = mock(EventHandler.class); when(mockContext.getEventHandler()).thenReturn(mockEventHandler); - ContainerManagementProtocol mockCM = mock(ContainerManagementProtocol.class); - when(mockRpc.getProxy(eq(ContainerManagementProtocol.class), - any(InetSocketAddress.class), any(Configuration.class))) - .thenReturn(mockCM); - - ContainerLauncherImplUnderTest ut = - new ContainerLauncherImplUnderTest(mockContext, mockRpc); + ContainerManagementProtocol mockCM = + mock(ContainerManagementProtocol.class); + ContainerLauncherImplUnderTest ut = + new ContainerLauncherImplUnderTest(mockContext, mockCM); Configuration conf = new Configuration(); ut.init(conf); @@ -264,23 +265,19 @@ public class TestContainerLauncherImpl { } } - @Test + @Test(timeout = 5000) public void testMyShutdown() throws Exception { LOG.info("in test Shutdown"); - YarnRPC mockRpc = mock(YarnRPC.class); AppContext mockContext = mock(AppContext.class); @SuppressWarnings("rawtypes") EventHandler mockEventHandler = mock(EventHandler.class); when(mockContext.getEventHandler()).thenReturn(mockEventHandler); - ContainerManagementProtocol mockCM = mock(ContainerManagementProtocol.class); - when(mockRpc.getProxy(eq(ContainerManagementProtocol.class), - any(InetSocketAddress.class), any(Configuration.class))) - .thenReturn(mockCM); - + ContainerManagementProtocol mockCM = + mock(ContainerManagementProtocol.class); ContainerLauncherImplUnderTest ut = - new ContainerLauncherImplUnderTest(mockContext, mockRpc); + new ContainerLauncherImplUnderTest(mockContext, mockCM); Configuration conf = new Configuration(); ut.init(conf); @@ -320,26 +317,22 @@ public class TestContainerLauncherImpl { } @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test + @Test(timeout = 5000) public void testContainerCleaned() throws Exception { LOG.info("STARTING testContainerCleaned"); CyclicBarrier startLaunchBarrier = new CyclicBarrier(2); CyclicBarrier completeLaunchBarrier = new CyclicBarrier(2); - YarnRPC mockRpc = mock(YarnRPC.class); AppContext mockContext = mock(AppContext.class); EventHandler mockEventHandler = mock(EventHandler.class); when(mockContext.getEventHandler()).thenReturn(mockEventHandler); - ContainerManagementProtocol mockCM = new ContainerManagerForTest(startLaunchBarrier, completeLaunchBarrier); - when(mockRpc.getProxy(eq(ContainerManagementProtocol.class), - any(InetSocketAddress.class), any(Configuration.class))) - .thenReturn(mockCM); - - ContainerLauncherImplUnderTest ut = - new ContainerLauncherImplUnderTest(mockContext, mockRpc); + ContainerManagementProtocol mockCM = + new ContainerManagerForTest(startLaunchBarrier, completeLaunchBarrier); + ContainerLauncherImplUnderTest ut = + new ContainerLauncherImplUnderTest(mockContext, mockCM); Configuration conf = new Configuration(); ut.init(conf);