From f2e0a125f44e2a529ac2ff74feb655741fed56ba Mon Sep 17 00:00:00 2001 From: Alejandro Abdelnur Date: Mon, 9 Sep 2013 11:12:45 +0000 Subject: [PATCH] YARN-1144. Unmanaged AMs registering a tracking URI should not be proxy-fied. (tucu) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1521039 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../rmapp/attempt/RMAppAttemptImpl.java | 5 +- .../rmapp/attempt/TestRMAppAttemptImpl.java | 77 +++++++++++++++++++ .../attempt/TestRMAppAttemptTransitions.java | 56 ++++++++------ 4 files changed, 116 insertions(+), 25 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptImpl.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index c20d715a3e4..17e53641ff0 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -165,6 +165,9 @@ Release 2.1.1-beta - UNRELEASED YARN-1049. ContainerExistStatus should define a status for preempted containers. (tucu) + YARN-1144. Unmanaged AMs registering a tracking URI should not be + proxy-fied. (tucu) + Release 2.1.0-beta - 2013-08-22 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 00397cfa650..51e6dc9f10d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -440,7 +440,8 @@ public int getRpcPort() { public String getTrackingUrl() { this.readLock.lock(); try { - return this.proxiedTrackingUrl; + return (getSubmissionContext().getUnmanagedAM()) ? + this.origTrackingUrl : this.proxiedTrackingUrl; } finally { this.readLock.unlock(); } @@ -961,7 +962,7 @@ public void transition(RMAppAttemptImpl appAttempt, } } - private static final class AMRegisteredTransition extends BaseTransition { + static final class AMRegisteredTransition extends BaseTransition { @Override public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptImpl.java new file mode 100644 index 00000000000..e69d867e0f4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptImpl.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt; + +import junit.framework.Assert; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; + +import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestRMAppAttemptImpl { + + private void testTrackingUrl(String url, boolean unmanaged) { + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance + (ApplicationId.newInstance(1, 2), 1); + EventHandler handler = Mockito.mock(EventHandler.class); + Dispatcher dispatcher = Mockito.mock(Dispatcher.class); + Mockito.when(dispatcher.getEventHandler()).thenReturn(handler); + RMContext rmContext = Mockito.mock(RMContext.class); + Mockito.when(rmContext.getDispatcher()).thenReturn(dispatcher); + + ApplicationSubmissionContext appContext = + Mockito.mock(ApplicationSubmissionContext.class); + Mockito.when(appContext.getUnmanagedAM()).thenReturn(unmanaged); + + RMAppAttemptImpl attempt = new RMAppAttemptImpl(attemptId, rmContext, null, + null, appContext, new YarnConfiguration(), null); + RMAppAttemptRegistrationEvent event = + Mockito.mock(RMAppAttemptRegistrationEvent.class); + Mockito.when(event.getHost()).thenReturn("h"); + Mockito.when(event.getRpcport()).thenReturn(0); + Mockito.when(event.getTrackingurl()).thenReturn(url); + new RMAppAttemptImpl.AMRegisteredTransition().transition(attempt, event); + if (unmanaged) { + Assert.assertEquals(url, attempt.getTrackingUrl()); + } else { + Assert.assertNotSame(url, attempt.getTrackingUrl()); + Assert.assertTrue(attempt.getTrackingUrl().contains( + ProxyUriUtils.PROXY_SERVLET_NAME)); + Assert.assertTrue(attempt.getTrackingUrl().contains( + attemptId.getApplicationId().toString())); + } + } + + @Test + public void testTrackingUrlUnmanagedAM() { + testTrackingUrl("http://foo:8000/x", true); + } + + @Test + public void testTrackingUrlManagedAM() { + testTrackingUrl("bar:8000/x", false); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index 5261d077d5c..2290284eaa8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -408,16 +408,19 @@ private void testAppAttemptLaunchedState(Container container) { * {@link RMAppAttemptState#RUNNING} */ private void testAppAttemptRunningState(Container container, - String host, int rpcPort, String trackingUrl) { + String host, int rpcPort, String trackingUrl, boolean unmanagedAM) { assertEquals(RMAppAttemptState.RUNNING, applicationAttempt.getAppAttemptState()); assertEquals(container, applicationAttempt.getMasterContainer()); assertEquals(host, applicationAttempt.getHost()); assertEquals(rpcPort, applicationAttempt.getRpcPort()); assertEquals(trackingUrl, applicationAttempt.getOriginalTrackingUrl()); - assertEquals(getProxyUrl(applicationAttempt), - applicationAttempt.getTrackingUrl()); - + if (unmanagedAM) { + assertEquals("oldtrackingurl", applicationAttempt.getTrackingUrl()); + } else { + assertEquals(getProxyUrl(applicationAttempt), + applicationAttempt.getTrackingUrl()); + } // TODO - need to add more checks relevant to this state } @@ -446,13 +449,18 @@ private void testAppAttemptFinishedState(Container container, FinalApplicationStatus finalStatus, String trackingUrl, String diagnostics, - int finishedContainerCount) { + int finishedContainerCount, boolean unmanagedAM) { assertEquals(RMAppAttemptState.FINISHED, applicationAttempt.getAppAttemptState()); assertEquals(diagnostics, applicationAttempt.getDiagnostics()); assertEquals(trackingUrl, applicationAttempt.getOriginalTrackingUrl()); - assertEquals(getProxyUrl(applicationAttempt), - applicationAttempt.getTrackingUrl()); + if (unmanagedAM) { + assertEquals("mytrackingurl", applicationAttempt.getTrackingUrl()); + + } else { + assertEquals(getProxyUrl(applicationAttempt), + applicationAttempt.getTrackingUrl()); + } assertEquals(finishedContainerCount, applicationAttempt .getJustFinishedContainers().size()); assertEquals(container, applicationAttempt.getMasterContainer()); @@ -535,13 +543,14 @@ private void launchApplicationAttempt(Container container) { private void runApplicationAttempt(Container container, String host, int rpcPort, - String trackingUrl) { + String trackingUrl, boolean unmanagedAM) { applicationAttempt.handle( new RMAppAttemptRegistrationEvent( applicationAttempt.getAppAttemptId(), host, rpcPort, trackingUrl)); - testAppAttemptRunningState(container, host, rpcPort, trackingUrl); + testAppAttemptRunningState(container, host, rpcPort, trackingUrl, + unmanagedAM); } private void unregisterApplicationAttempt(Container container, @@ -567,7 +576,7 @@ public void testUnmanagedAMSuccess() { applicationAttempt.getAppAttemptId()); // launch AM - runApplicationAttempt(null, "host", 8042, "oldtrackingurl"); + runApplicationAttempt(null, "host", 8042, "oldtrackingurl", true); // complete a container applicationAttempt.handle(new RMAppAttemptContainerAcquiredEvent( @@ -581,7 +590,8 @@ public void testUnmanagedAMSuccess() { applicationAttempt.handle(new RMAppAttemptUnregistrationEvent( applicationAttempt.getAppAttemptId(), trackingUrl, finalStatus, diagnostics)); - testAppAttemptFinishedState(null, finalStatus, trackingUrl, diagnostics, 1); + testAppAttemptFinishedState(null, finalStatus, trackingUrl, diagnostics, 1, + true); } @Test @@ -690,7 +700,7 @@ public void testAMCrashAtAllocated() { public void testRunningToFailed() { Container amContainer = allocateApplicationAttempt(); launchApplicationAttempt(amContainer); - runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl"); + runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false); String containerDiagMsg = "some error"; int exitCode = 123; ContainerStatus cs = BuilderUtils.newContainerStatus(amContainer.getId(), @@ -713,7 +723,7 @@ public void testRunningToFailed() { public void testRunningToKilled() { Container amContainer = allocateApplicationAttempt(); launchApplicationAttempt(amContainer); - runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl"); + runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false); applicationAttempt.handle( new RMAppAttemptEvent( applicationAttempt.getAppAttemptId(), @@ -751,7 +761,7 @@ public void testLaunchedExpire() { public void testRunningExpire() { Container amContainer = allocateApplicationAttempt(); launchApplicationAttempt(amContainer); - runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl"); + runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false); applicationAttempt.handle(new RMAppAttemptEvent( applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE)); assertEquals(RMAppAttemptState.FAILED, @@ -769,7 +779,7 @@ public void testRunningExpire() { public void testUnregisterToKilledFinishing() { Container amContainer = allocateApplicationAttempt(); launchApplicationAttempt(amContainer); - runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl"); + runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false); unregisterApplicationAttempt(amContainer, FinalApplicationStatus.KILLED, "newtrackingurl", "Killed by user"); @@ -780,14 +790,14 @@ public void testUnregisterToKilledFinishing() { public void testNoTrackingUrl() { Container amContainer = allocateApplicationAttempt(); launchApplicationAttempt(amContainer); - runApplicationAttempt(amContainer, "host", 8042, ""); + runApplicationAttempt(amContainer, "host", 8042, "", false); } @Test public void testUnregisterToSuccessfulFinishing() { Container amContainer = allocateApplicationAttempt(); launchApplicationAttempt(amContainer); - runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl"); + runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false); unregisterApplicationAttempt(amContainer, FinalApplicationStatus.SUCCEEDED, "mytrackingurl", "Successful"); } @@ -796,7 +806,7 @@ public void testUnregisterToSuccessfulFinishing() { public void testFinishingKill() { Container amContainer = allocateApplicationAttempt(); launchApplicationAttempt(amContainer); - runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl"); + runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false); FinalApplicationStatus finalStatus = FinalApplicationStatus.FAILED; String trackingUrl = "newtrackingurl"; String diagnostics = "Job failed"; @@ -814,7 +824,7 @@ public void testFinishingKill() { public void testFinishingExpire() { Container amContainer = allocateApplicationAttempt(); launchApplicationAttempt(amContainer); - runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl"); + runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false); FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED; String trackingUrl = "mytrackingurl"; String diagnostics = "Successful"; @@ -825,14 +835,14 @@ public void testFinishingExpire() { applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE)); testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl, - diagnostics, 0); + diagnostics, 0, false); } @Test public void testFinishingToFinishing() { Container amContainer = allocateApplicationAttempt(); launchApplicationAttempt(amContainer); - runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl"); + runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false); FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED; String trackingUrl = "mytrackingurl"; String diagnostics = "Successful"; @@ -854,7 +864,7 @@ public void testFinishingToFinishing() { public void testSuccessfulFinishingToFinished() { Container amContainer = allocateApplicationAttempt(); launchApplicationAttempt(amContainer); - runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl"); + runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false); FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED; String trackingUrl = "mytrackingurl"; String diagnostics = "Successful"; @@ -866,7 +876,7 @@ public void testSuccessfulFinishingToFinished() { BuilderUtils.newContainerStatus(amContainer.getId(), ContainerState.COMPLETE, "", 0))); testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl, - diagnostics, 0); + diagnostics, 0, false); } private void verifyTokenCount(ApplicationAttemptId appAttemptId, int count) {