YARN-1144. Unmanaged AMs registering a tracking URI should not be proxy-fied. (tucu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1521044 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6a7607594f
commit
1680bf7061
|
@ -150,6 +150,9 @@ Release 2.1.1-beta - UNRELEASED
|
||||||
YARN-1049. ContainerExistStatus should define a status for preempted
|
YARN-1049. ContainerExistStatus should define a status for preempted
|
||||||
containers. (tucu)
|
containers. (tucu)
|
||||||
|
|
||||||
|
YARN-1144. Unmanaged AMs registering a tracking URI should not be
|
||||||
|
proxy-fied. (tucu)
|
||||||
|
|
||||||
Release 2.1.0-beta - 2013-08-22
|
Release 2.1.0-beta - 2013-08-22
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -440,7 +440,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
public String getTrackingUrl() {
|
public String getTrackingUrl() {
|
||||||
this.readLock.lock();
|
this.readLock.lock();
|
||||||
try {
|
try {
|
||||||
return this.proxiedTrackingUrl;
|
return (getSubmissionContext().getUnmanagedAM()) ?
|
||||||
|
this.origTrackingUrl : this.proxiedTrackingUrl;
|
||||||
} finally {
|
} finally {
|
||||||
this.readLock.unlock();
|
this.readLock.unlock();
|
||||||
}
|
}
|
||||||
|
@ -961,7 +962,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class AMRegisteredTransition extends BaseTransition {
|
static final class AMRegisteredTransition extends BaseTransition {
|
||||||
@Override
|
@Override
|
||||||
public void transition(RMAppAttemptImpl appAttempt,
|
public void transition(RMAppAttemptImpl appAttempt,
|
||||||
RMAppAttemptEvent event) {
|
RMAppAttemptEvent event) {
|
||||||
|
|
|
@ -0,0 +1,77 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
|
||||||
|
|
||||||
|
import junit.framework.Assert;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
public class TestRMAppAttemptImpl {
|
||||||
|
|
||||||
|
private void testTrackingUrl(String url, boolean unmanaged) {
|
||||||
|
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance
|
||||||
|
(ApplicationId.newInstance(1, 2), 1);
|
||||||
|
EventHandler handler = Mockito.mock(EventHandler.class);
|
||||||
|
Dispatcher dispatcher = Mockito.mock(Dispatcher.class);
|
||||||
|
Mockito.when(dispatcher.getEventHandler()).thenReturn(handler);
|
||||||
|
RMContext rmContext = Mockito.mock(RMContext.class);
|
||||||
|
Mockito.when(rmContext.getDispatcher()).thenReturn(dispatcher);
|
||||||
|
|
||||||
|
ApplicationSubmissionContext appContext =
|
||||||
|
Mockito.mock(ApplicationSubmissionContext.class);
|
||||||
|
Mockito.when(appContext.getUnmanagedAM()).thenReturn(unmanaged);
|
||||||
|
|
||||||
|
RMAppAttemptImpl attempt = new RMAppAttemptImpl(attemptId, rmContext, null,
|
||||||
|
null, appContext, new YarnConfiguration(), null);
|
||||||
|
RMAppAttemptRegistrationEvent event =
|
||||||
|
Mockito.mock(RMAppAttemptRegistrationEvent.class);
|
||||||
|
Mockito.when(event.getHost()).thenReturn("h");
|
||||||
|
Mockito.when(event.getRpcport()).thenReturn(0);
|
||||||
|
Mockito.when(event.getTrackingurl()).thenReturn(url);
|
||||||
|
new RMAppAttemptImpl.AMRegisteredTransition().transition(attempt, event);
|
||||||
|
if (unmanaged) {
|
||||||
|
Assert.assertEquals(url, attempt.getTrackingUrl());
|
||||||
|
} else {
|
||||||
|
Assert.assertNotSame(url, attempt.getTrackingUrl());
|
||||||
|
Assert.assertTrue(attempt.getTrackingUrl().contains(
|
||||||
|
ProxyUriUtils.PROXY_SERVLET_NAME));
|
||||||
|
Assert.assertTrue(attempt.getTrackingUrl().contains(
|
||||||
|
attemptId.getApplicationId().toString()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTrackingUrlUnmanagedAM() {
|
||||||
|
testTrackingUrl("http://foo:8000/x", true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTrackingUrlManagedAM() {
|
||||||
|
testTrackingUrl("bar:8000/x", false);
|
||||||
|
}
|
||||||
|
}
|
|
@ -408,16 +408,19 @@ public class TestRMAppAttemptTransitions {
|
||||||
* {@link RMAppAttemptState#RUNNING}
|
* {@link RMAppAttemptState#RUNNING}
|
||||||
*/
|
*/
|
||||||
private void testAppAttemptRunningState(Container container,
|
private void testAppAttemptRunningState(Container container,
|
||||||
String host, int rpcPort, String trackingUrl) {
|
String host, int rpcPort, String trackingUrl, boolean unmanagedAM) {
|
||||||
assertEquals(RMAppAttemptState.RUNNING,
|
assertEquals(RMAppAttemptState.RUNNING,
|
||||||
applicationAttempt.getAppAttemptState());
|
applicationAttempt.getAppAttemptState());
|
||||||
assertEquals(container, applicationAttempt.getMasterContainer());
|
assertEquals(container, applicationAttempt.getMasterContainer());
|
||||||
assertEquals(host, applicationAttempt.getHost());
|
assertEquals(host, applicationAttempt.getHost());
|
||||||
assertEquals(rpcPort, applicationAttempt.getRpcPort());
|
assertEquals(rpcPort, applicationAttempt.getRpcPort());
|
||||||
assertEquals(trackingUrl, applicationAttempt.getOriginalTrackingUrl());
|
assertEquals(trackingUrl, applicationAttempt.getOriginalTrackingUrl());
|
||||||
assertEquals(getProxyUrl(applicationAttempt),
|
if (unmanagedAM) {
|
||||||
applicationAttempt.getTrackingUrl());
|
assertEquals("oldtrackingurl", applicationAttempt.getTrackingUrl());
|
||||||
|
} else {
|
||||||
|
assertEquals(getProxyUrl(applicationAttempt),
|
||||||
|
applicationAttempt.getTrackingUrl());
|
||||||
|
}
|
||||||
// TODO - need to add more checks relevant to this state
|
// TODO - need to add more checks relevant to this state
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -446,13 +449,18 @@ public class TestRMAppAttemptTransitions {
|
||||||
FinalApplicationStatus finalStatus,
|
FinalApplicationStatus finalStatus,
|
||||||
String trackingUrl,
|
String trackingUrl,
|
||||||
String diagnostics,
|
String diagnostics,
|
||||||
int finishedContainerCount) {
|
int finishedContainerCount, boolean unmanagedAM) {
|
||||||
assertEquals(RMAppAttemptState.FINISHED,
|
assertEquals(RMAppAttemptState.FINISHED,
|
||||||
applicationAttempt.getAppAttemptState());
|
applicationAttempt.getAppAttemptState());
|
||||||
assertEquals(diagnostics, applicationAttempt.getDiagnostics());
|
assertEquals(diagnostics, applicationAttempt.getDiagnostics());
|
||||||
assertEquals(trackingUrl, applicationAttempt.getOriginalTrackingUrl());
|
assertEquals(trackingUrl, applicationAttempt.getOriginalTrackingUrl());
|
||||||
assertEquals(getProxyUrl(applicationAttempt),
|
if (unmanagedAM) {
|
||||||
applicationAttempt.getTrackingUrl());
|
assertEquals("mytrackingurl", applicationAttempt.getTrackingUrl());
|
||||||
|
|
||||||
|
} else {
|
||||||
|
assertEquals(getProxyUrl(applicationAttempt),
|
||||||
|
applicationAttempt.getTrackingUrl());
|
||||||
|
}
|
||||||
assertEquals(finishedContainerCount, applicationAttempt
|
assertEquals(finishedContainerCount, applicationAttempt
|
||||||
.getJustFinishedContainers().size());
|
.getJustFinishedContainers().size());
|
||||||
assertEquals(container, applicationAttempt.getMasterContainer());
|
assertEquals(container, applicationAttempt.getMasterContainer());
|
||||||
|
@ -535,13 +543,14 @@ public class TestRMAppAttemptTransitions {
|
||||||
private void runApplicationAttempt(Container container,
|
private void runApplicationAttempt(Container container,
|
||||||
String host,
|
String host,
|
||||||
int rpcPort,
|
int rpcPort,
|
||||||
String trackingUrl) {
|
String trackingUrl, boolean unmanagedAM) {
|
||||||
applicationAttempt.handle(
|
applicationAttempt.handle(
|
||||||
new RMAppAttemptRegistrationEvent(
|
new RMAppAttemptRegistrationEvent(
|
||||||
applicationAttempt.getAppAttemptId(),
|
applicationAttempt.getAppAttemptId(),
|
||||||
host, rpcPort, trackingUrl));
|
host, rpcPort, trackingUrl));
|
||||||
|
|
||||||
testAppAttemptRunningState(container, host, rpcPort, trackingUrl);
|
testAppAttemptRunningState(container, host, rpcPort, trackingUrl,
|
||||||
|
unmanagedAM);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void unregisterApplicationAttempt(Container container,
|
private void unregisterApplicationAttempt(Container container,
|
||||||
|
@ -567,7 +576,7 @@ public class TestRMAppAttemptTransitions {
|
||||||
applicationAttempt.getAppAttemptId());
|
applicationAttempt.getAppAttemptId());
|
||||||
|
|
||||||
// launch AM
|
// launch AM
|
||||||
runApplicationAttempt(null, "host", 8042, "oldtrackingurl");
|
runApplicationAttempt(null, "host", 8042, "oldtrackingurl", true);
|
||||||
|
|
||||||
// complete a container
|
// complete a container
|
||||||
applicationAttempt.handle(new RMAppAttemptContainerAcquiredEvent(
|
applicationAttempt.handle(new RMAppAttemptContainerAcquiredEvent(
|
||||||
|
@ -581,7 +590,8 @@ public class TestRMAppAttemptTransitions {
|
||||||
applicationAttempt.handle(new RMAppAttemptUnregistrationEvent(
|
applicationAttempt.handle(new RMAppAttemptUnregistrationEvent(
|
||||||
applicationAttempt.getAppAttemptId(), trackingUrl, finalStatus,
|
applicationAttempt.getAppAttemptId(), trackingUrl, finalStatus,
|
||||||
diagnostics));
|
diagnostics));
|
||||||
testAppAttemptFinishedState(null, finalStatus, trackingUrl, diagnostics, 1);
|
testAppAttemptFinishedState(null, finalStatus, trackingUrl, diagnostics, 1,
|
||||||
|
true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -690,7 +700,7 @@ public class TestRMAppAttemptTransitions {
|
||||||
public void testRunningToFailed() {
|
public void testRunningToFailed() {
|
||||||
Container amContainer = allocateApplicationAttempt();
|
Container amContainer = allocateApplicationAttempt();
|
||||||
launchApplicationAttempt(amContainer);
|
launchApplicationAttempt(amContainer);
|
||||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
|
||||||
String containerDiagMsg = "some error";
|
String containerDiagMsg = "some error";
|
||||||
int exitCode = 123;
|
int exitCode = 123;
|
||||||
ContainerStatus cs = BuilderUtils.newContainerStatus(amContainer.getId(),
|
ContainerStatus cs = BuilderUtils.newContainerStatus(amContainer.getId(),
|
||||||
|
@ -713,7 +723,7 @@ public class TestRMAppAttemptTransitions {
|
||||||
public void testRunningToKilled() {
|
public void testRunningToKilled() {
|
||||||
Container amContainer = allocateApplicationAttempt();
|
Container amContainer = allocateApplicationAttempt();
|
||||||
launchApplicationAttempt(amContainer);
|
launchApplicationAttempt(amContainer);
|
||||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
|
||||||
applicationAttempt.handle(
|
applicationAttempt.handle(
|
||||||
new RMAppAttemptEvent(
|
new RMAppAttemptEvent(
|
||||||
applicationAttempt.getAppAttemptId(),
|
applicationAttempt.getAppAttemptId(),
|
||||||
|
@ -751,7 +761,7 @@ public class TestRMAppAttemptTransitions {
|
||||||
public void testRunningExpire() {
|
public void testRunningExpire() {
|
||||||
Container amContainer = allocateApplicationAttempt();
|
Container amContainer = allocateApplicationAttempt();
|
||||||
launchApplicationAttempt(amContainer);
|
launchApplicationAttempt(amContainer);
|
||||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
|
||||||
applicationAttempt.handle(new RMAppAttemptEvent(
|
applicationAttempt.handle(new RMAppAttemptEvent(
|
||||||
applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE));
|
applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE));
|
||||||
assertEquals(RMAppAttemptState.FAILED,
|
assertEquals(RMAppAttemptState.FAILED,
|
||||||
|
@ -769,7 +779,7 @@ public class TestRMAppAttemptTransitions {
|
||||||
public void testUnregisterToKilledFinishing() {
|
public void testUnregisterToKilledFinishing() {
|
||||||
Container amContainer = allocateApplicationAttempt();
|
Container amContainer = allocateApplicationAttempt();
|
||||||
launchApplicationAttempt(amContainer);
|
launchApplicationAttempt(amContainer);
|
||||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
|
||||||
unregisterApplicationAttempt(amContainer,
|
unregisterApplicationAttempt(amContainer,
|
||||||
FinalApplicationStatus.KILLED, "newtrackingurl",
|
FinalApplicationStatus.KILLED, "newtrackingurl",
|
||||||
"Killed by user");
|
"Killed by user");
|
||||||
|
@ -780,14 +790,14 @@ public class TestRMAppAttemptTransitions {
|
||||||
public void testNoTrackingUrl() {
|
public void testNoTrackingUrl() {
|
||||||
Container amContainer = allocateApplicationAttempt();
|
Container amContainer = allocateApplicationAttempt();
|
||||||
launchApplicationAttempt(amContainer);
|
launchApplicationAttempt(amContainer);
|
||||||
runApplicationAttempt(amContainer, "host", 8042, "");
|
runApplicationAttempt(amContainer, "host", 8042, "", false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUnregisterToSuccessfulFinishing() {
|
public void testUnregisterToSuccessfulFinishing() {
|
||||||
Container amContainer = allocateApplicationAttempt();
|
Container amContainer = allocateApplicationAttempt();
|
||||||
launchApplicationAttempt(amContainer);
|
launchApplicationAttempt(amContainer);
|
||||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
|
||||||
unregisterApplicationAttempt(amContainer,
|
unregisterApplicationAttempt(amContainer,
|
||||||
FinalApplicationStatus.SUCCEEDED, "mytrackingurl", "Successful");
|
FinalApplicationStatus.SUCCEEDED, "mytrackingurl", "Successful");
|
||||||
}
|
}
|
||||||
|
@ -796,7 +806,7 @@ public class TestRMAppAttemptTransitions {
|
||||||
public void testFinishingKill() {
|
public void testFinishingKill() {
|
||||||
Container amContainer = allocateApplicationAttempt();
|
Container amContainer = allocateApplicationAttempt();
|
||||||
launchApplicationAttempt(amContainer);
|
launchApplicationAttempt(amContainer);
|
||||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
|
||||||
FinalApplicationStatus finalStatus = FinalApplicationStatus.FAILED;
|
FinalApplicationStatus finalStatus = FinalApplicationStatus.FAILED;
|
||||||
String trackingUrl = "newtrackingurl";
|
String trackingUrl = "newtrackingurl";
|
||||||
String diagnostics = "Job failed";
|
String diagnostics = "Job failed";
|
||||||
|
@ -814,7 +824,7 @@ public class TestRMAppAttemptTransitions {
|
||||||
public void testFinishingExpire() {
|
public void testFinishingExpire() {
|
||||||
Container amContainer = allocateApplicationAttempt();
|
Container amContainer = allocateApplicationAttempt();
|
||||||
launchApplicationAttempt(amContainer);
|
launchApplicationAttempt(amContainer);
|
||||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
|
||||||
FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
|
FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
|
||||||
String trackingUrl = "mytrackingurl";
|
String trackingUrl = "mytrackingurl";
|
||||||
String diagnostics = "Successful";
|
String diagnostics = "Successful";
|
||||||
|
@ -825,14 +835,14 @@ public class TestRMAppAttemptTransitions {
|
||||||
applicationAttempt.getAppAttemptId(),
|
applicationAttempt.getAppAttemptId(),
|
||||||
RMAppAttemptEventType.EXPIRE));
|
RMAppAttemptEventType.EXPIRE));
|
||||||
testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
|
testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
|
||||||
diagnostics, 0);
|
diagnostics, 0, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFinishingToFinishing() {
|
public void testFinishingToFinishing() {
|
||||||
Container amContainer = allocateApplicationAttempt();
|
Container amContainer = allocateApplicationAttempt();
|
||||||
launchApplicationAttempt(amContainer);
|
launchApplicationAttempt(amContainer);
|
||||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
|
||||||
FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
|
FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
|
||||||
String trackingUrl = "mytrackingurl";
|
String trackingUrl = "mytrackingurl";
|
||||||
String diagnostics = "Successful";
|
String diagnostics = "Successful";
|
||||||
|
@ -854,7 +864,7 @@ public class TestRMAppAttemptTransitions {
|
||||||
public void testSuccessfulFinishingToFinished() {
|
public void testSuccessfulFinishingToFinished() {
|
||||||
Container amContainer = allocateApplicationAttempt();
|
Container amContainer = allocateApplicationAttempt();
|
||||||
launchApplicationAttempt(amContainer);
|
launchApplicationAttempt(amContainer);
|
||||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
|
||||||
FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
|
FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
|
||||||
String trackingUrl = "mytrackingurl";
|
String trackingUrl = "mytrackingurl";
|
||||||
String diagnostics = "Successful";
|
String diagnostics = "Successful";
|
||||||
|
@ -866,7 +876,7 @@ public class TestRMAppAttemptTransitions {
|
||||||
BuilderUtils.newContainerStatus(amContainer.getId(),
|
BuilderUtils.newContainerStatus(amContainer.getId(),
|
||||||
ContainerState.COMPLETE, "", 0)));
|
ContainerState.COMPLETE, "", 0)));
|
||||||
testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
|
testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
|
||||||
diagnostics, 0);
|
diagnostics, 0, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifyTokenCount(ApplicationAttemptId appAttemptId, int count) {
|
private void verifyTokenCount(ApplicationAttemptId appAttemptId, int count) {
|
||||||
|
|
Loading…
Reference in New Issue