YARN-7974. Allow updating application tracking url after registration. Contributed by Jonathan Hung

This commit is contained in:
Jonathan Hung 2018-07-30 17:41:01 -07:00
parent 207f08bab4
commit 0d6e1a2aab
13 changed files with 299 additions and 4 deletions

View File

@ -72,6 +72,20 @@ public abstract class AllocateRequest {
.resourceBlacklistRequest(resourceBlacklistRequest).build(); .resourceBlacklistRequest(resourceBlacklistRequest).build();
} }
@Public
@Unstable
public static AllocateRequest newInstance(int responseID, float appProgress,
List<ResourceRequest> resourceAsk,
List<ContainerId> containersToBeReleased,
ResourceBlacklistRequest resourceBlacklistRequest,
String trackingUrl) {
return AllocateRequest.newBuilder().responseId(responseID)
.progress(appProgress).askList(resourceAsk)
.releaseList(containersToBeReleased)
.resourceBlacklistRequest(resourceBlacklistRequest)
.trackingUrl(trackingUrl).build();
}
@Public @Public
@Unstable @Unstable
public static AllocateRequest newInstance(int responseID, float appProgress, public static AllocateRequest newInstance(int responseID, float appProgress,
@ -212,6 +226,22 @@ public abstract class AllocateRequest {
public abstract void setUpdateRequests( public abstract void setUpdateRequests(
List<UpdateContainerRequest> updateRequests); List<UpdateContainerRequest> updateRequests);
/**
* Get the tracking url update for this heartbeat.
* @return tracking url to update this application with
*/
@Public
@Unstable
public abstract String getTrackingUrl();
/**
* Set the new tracking url for this application.
* @param trackingUrl the new tracking url
*/
@Public
@Unstable
public abstract void setTrackingUrl(String trackingUrl);
@Public @Public
@Unstable @Unstable
public static AllocateRequestBuilder newBuilder() { public static AllocateRequestBuilder newBuilder() {
@ -313,6 +343,19 @@ public abstract class AllocateRequest {
return this; return this;
} }
/**
* Set the <code>trackingUrl</code> of the request.
* @see AllocateRequest#setTrackingUrl(String)
* @param trackingUrl new tracking url
* @return {@link AllocateRequestBuilder}
*/
@Public
@Unstable
public AllocateRequestBuilder trackingUrl(String trackingUrl) {
allocateRequest.setTrackingUrl(trackingUrl);
return this;
}
/** /**
* Return generated {@link AllocateRequest} object. * Return generated {@link AllocateRequest} object.
* @return {@link AllocateRequest} * @return {@link AllocateRequest}

View File

@ -89,6 +89,7 @@ message AllocateRequestProto {
optional int32 response_id = 4; optional int32 response_id = 4;
optional float progress = 5; optional float progress = 5;
repeated UpdateContainerRequestProto update_requests = 7; repeated UpdateContainerRequestProto update_requests = 7;
optional string tracking_url = 11;
} }
message NMTokenProto { message NMTokenProto {

View File

@ -701,6 +701,17 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
return this.timelineV2Client; return this.timelineV2Client;
} }
/**
* Update application's tracking url on next heartbeat.
*
* @param trackingUrl new tracking url for this application
*/
@Public
@InterfaceStability.Unstable
public void updateTrackingUrl(String trackingUrl) {
// Unimplemented.
}
/** /**
* Wait for <code>check</code> to return true for each 1000 ms. * Wait for <code>check</code> to return true for each 1000 ms.
* See also {@link #waitFor(java.util.function.Supplier, int)} * See also {@link #waitFor(java.util.function.Supplier, int)}

View File

@ -374,6 +374,17 @@ extends AbstractService {
public abstract void updateBlacklist(List<String> blacklistAdditions, public abstract void updateBlacklist(List<String> blacklistAdditions,
List<String> blacklistRemovals); List<String> blacklistRemovals);
/**
* Update application's tracking url on next heartbeat.
*
* @param trackingUrl new tracking url for this application
*/
@Public
@Unstable
public void updateTrackingUrl(String trackingUrl) {
// Unimplemented.
}
/** /**
* Wait for <code>check</code> to return true for each 1000 ms. * Wait for <code>check</code> to return true for each 1000 ms.
* See also {@link #waitFor(java.util.function.Supplier, int)} * See also {@link #waitFor(java.util.function.Supplier, int)}

View File

@ -250,6 +250,11 @@ extends AMRMClientAsync<T> {
client.updateBlacklist(blacklistAdditions, blacklistRemovals); client.updateBlacklist(blacklistAdditions, blacklistRemovals);
} }
@Override
public void updateTrackingUrl(String trackingUrl) {
client.updateTrackingUrl(trackingUrl);
}
private class HeartbeatThread extends Thread { private class HeartbeatThread extends Thread {
public HeartbeatThread() { public HeartbeatThread() {
super("AMRM Heartbeater thread"); super("AMRM Heartbeater thread");

View File

@ -95,6 +95,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
protected String appHostName; protected String appHostName;
protected int appHostPort; protected int appHostPort;
protected String appTrackingUrl; protected String appTrackingUrl;
protected String newTrackingUrl;
protected ApplicationMasterProtocol rmClient; protected ApplicationMasterProtocol rmClient;
protected Resource clusterAvailableResources; protected Resource clusterAvailableResources;
@ -281,6 +282,12 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
.responseId(lastResponseId).progress(progressIndicator) .responseId(lastResponseId).progress(progressIndicator)
.askList(askList).resourceBlacklistRequest(blacklistRequest) .askList(askList).resourceBlacklistRequest(blacklistRequest)
.releaseList(releaseList).updateRequests(updateList).build(); .releaseList(releaseList).updateRequests(updateList).build();
if (this.newTrackingUrl != null) {
allocateRequest.setTrackingUrl(this.newTrackingUrl);
this.appTrackingUrl = this.newTrackingUrl;
this.newTrackingUrl = null;
}
// clear blacklistAdditions and blacklistRemovals before // clear blacklistAdditions and blacklistRemovals before
// unsynchronized part // unsynchronized part
blacklistAdditions.clear(); blacklistAdditions.clear();
@ -932,6 +939,11 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
} }
} }
@Override
public synchronized void updateTrackingUrl(String trackingUrl) {
this.newTrackingUrl = trackingUrl;
}
private void updateAMRMToken(Token token) throws IOException { private void updateAMRMToken(Token token) throws IOException {
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken = org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken =
new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(token new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(token

View File

@ -20,10 +20,12 @@ package org.apache.hadoop.yarn.client.api.impl;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
@ -88,6 +90,7 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
@ -2052,4 +2055,78 @@ public class TestAMRMClient {
} }
return result; return result;
} }
@Test(timeout = 60000)
public void testNoUpdateTrackingUrl() {
try {
AMRMClientImpl<ContainerRequest> amClient = null;
amClient = new AMRMClientImpl<>();
amClient.init(conf);
amClient.start();
amClient.registerApplicationMaster("Host", 10000, "");
assertEquals("", amClient.appTrackingUrl);
ApplicationMasterProtocol mockRM = mock(ApplicationMasterProtocol.class);
AllocateResponse mockResponse = mock(AllocateResponse.class);
when(mockRM.allocate(any(AllocateRequest.class)))
.thenReturn(mockResponse);
ApplicationMasterProtocol realRM = amClient.rmClient;
amClient.rmClient = mockRM;
// Do allocate without updated tracking url
amClient.allocate(0.1f);
ArgumentCaptor<AllocateRequest> argument =
ArgumentCaptor.forClass(AllocateRequest.class);
verify(mockRM).allocate(argument.capture());
assertNull(argument.getValue().getTrackingUrl());
amClient.rmClient = realRM;
amClient
.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null,
null);
} catch (IOException | YarnException e) {
throw new AssertionError(
"testNoUpdateTrackingUrl unexpectedly threw exception: " + e);
}
}
@Test(timeout = 60000)
public void testUpdateTrackingUrl() {
try {
AMRMClientImpl<ContainerRequest> amClient = null;
amClient = new AMRMClientImpl<>();
amClient.init(conf);
amClient.start();
amClient.registerApplicationMaster("Host", 10000, "");
String trackingUrl = "hadoop.apache.org";
assertEquals("", amClient.appTrackingUrl);
ApplicationMasterProtocol mockRM = mock(ApplicationMasterProtocol.class);
AllocateResponse mockResponse = mock(AllocateResponse.class);
when(mockRM.allocate(any(AllocateRequest.class)))
.thenReturn(mockResponse);
ApplicationMasterProtocol realRM = amClient.rmClient;
amClient.rmClient = mockRM;
// Do allocate with updated tracking url
amClient.updateTrackingUrl(trackingUrl);
assertEquals(trackingUrl, amClient.newTrackingUrl);
assertEquals("", amClient.appTrackingUrl);
amClient.allocate(0.1f);
assertNull(amClient.newTrackingUrl);
assertEquals(trackingUrl, amClient.appTrackingUrl);
ArgumentCaptor<AllocateRequest> argument
= ArgumentCaptor.forClass(AllocateRequest.class);
verify(mockRM).allocate(argument.capture());
assertEquals(trackingUrl, argument.getValue().getTrackingUrl());
amClient.rmClient = realRM;
amClient
.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null,
null);
} catch (IOException | YarnException e) {
throw new AssertionError(
"testUpdateTrackingUrl unexpectedly threw exception: " + e);
}
}
} }

View File

@ -54,6 +54,7 @@ public class AllocateRequestPBImpl extends AllocateRequest {
private List<ContainerId> release = null; private List<ContainerId> release = null;
private List<UpdateContainerRequest> updateRequests = null; private List<UpdateContainerRequest> updateRequests = null;
private ResourceBlacklistRequest blacklistRequest = null; private ResourceBlacklistRequest blacklistRequest = null;
private String trackingUrl = null;
public AllocateRequestPBImpl() { public AllocateRequestPBImpl() {
builder = AllocateRequestProto.newBuilder(); builder = AllocateRequestProto.newBuilder();
@ -104,6 +105,9 @@ public class AllocateRequestPBImpl extends AllocateRequest {
if (this.blacklistRequest != null) { if (this.blacklistRequest != null) {
builder.setBlacklistRequest(convertToProtoFormat(this.blacklistRequest)); builder.setBlacklistRequest(convertToProtoFormat(this.blacklistRequest));
} }
if (this.trackingUrl != null) {
builder.setTrackingUrl(this.trackingUrl);
}
} }
private void mergeLocalToProto() { private void mergeLocalToProto() {
@ -325,6 +329,27 @@ public class AllocateRequestPBImpl extends AllocateRequest {
} }
} }
@Override
public String getTrackingUrl() {
AllocateRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.trackingUrl != null) {
return this.trackingUrl;
}
if (p.hasTrackingUrl()) {
this.trackingUrl = p.getTrackingUrl();
}
return this.trackingUrl;
}
@Override
public void setTrackingUrl(String trackingUrl) {
maybeInitBuilder();
if (trackingUrl == null) {
builder.clearTrackingUrl();
}
this.trackingUrl = trackingUrl;
}
private void addReleasesToProto() { private void addReleasesToProto() {
maybeInitBuilder(); maybeInitBuilder();
builder.clearRelease(); builder.clearRelease();

View File

@ -365,7 +365,7 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
// Send the status update to the appAttempt. // Send the status update to the appAttempt.
getRmContext().getDispatcher().getEventHandler().handle( getRmContext().getDispatcher().getEventHandler().handle(
new RMAppAttemptStatusupdateEvent(appAttemptId, request new RMAppAttemptStatusupdateEvent(appAttemptId, request
.getProgress())); .getProgress(), request.getTrackingUrl()));
} }
@Override @Override

View File

@ -1806,6 +1806,26 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
// Update progress // Update progress
appAttempt.progress = statusUpdateEvent.getProgress(); appAttempt.progress = statusUpdateEvent.getProgress();
// Update tracking url if changed and save it to state store
String newTrackingUrl = statusUpdateEvent.getTrackingUrl();
if (newTrackingUrl != null &&
!newTrackingUrl.equals(appAttempt.originalTrackingUrl)) {
appAttempt.originalTrackingUrl = newTrackingUrl;
ApplicationAttemptStateData attemptState = ApplicationAttemptStateData
.newInstance(appAttempt.applicationAttemptId,
appAttempt.getMasterContainer(),
appAttempt.rmContext.getStateStore()
.getCredentialsFromAppAttempt(appAttempt),
appAttempt.startTime, appAttempt.recoveredFinalState,
newTrackingUrl, appAttempt.getDiagnostics(), null,
ContainerExitStatus.INVALID, appAttempt.getFinishTime(),
appAttempt.attemptMetrics.getAggregateAppResourceUsage()
.getResourceUsageSecondsMap(),
appAttempt.attemptMetrics.getPreemptedResourceSecondsMap());
appAttempt.rmContext.getStateStore()
.updateApplicationAttemptState(attemptState);
}
// Ping to AMLivelinessMonitor // Ping to AMLivelinessMonitor
appAttempt.rmContext.getAMLivelinessMonitor().receivedPing( appAttempt.rmContext.getAMLivelinessMonitor().receivedPing(
statusUpdateEvent.getApplicationAttemptId()); statusUpdateEvent.getApplicationAttemptId());

View File

@ -25,15 +25,26 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE
public class RMAppAttemptStatusupdateEvent extends RMAppAttemptEvent { public class RMAppAttemptStatusupdateEvent extends RMAppAttemptEvent {
private final float progress; private final float progress;
private final String trackingUrl;
public RMAppAttemptStatusupdateEvent(ApplicationAttemptId appAttemptId, public RMAppAttemptStatusupdateEvent(ApplicationAttemptId appAttemptId,
float progress) { float progress) {
this(appAttemptId, progress, null);
}
public RMAppAttemptStatusupdateEvent(ApplicationAttemptId appAttemptId,
float progress, String trackingUrl) {
super(appAttemptId, RMAppAttemptEventType.STATUS_UPDATE); super(appAttemptId, RMAppAttemptEventType.STATUS_UPDATE);
this.progress = progress; this.progress = progress;
this.trackingUrl = trackingUrl;
} }
public float getProgress() { public float getProgress() {
return this.progress; return this.progress;
} }
public String getTrackingUrl() {
return this.trackingUrl;
}
} }

View File

@ -675,4 +675,38 @@ public class TestApplicationMasterService {
Assert.fail("Cannot find RMContainer"); Assert.fail("Cannot find RMContainer");
} }
} }
@Test(timeout = 300000)
public void testUpdateTrackingUrl() throws Exception {
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(conf);
rm.start();
// Register node1
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
RMApp app1 = rm.submitApp(2048);
nm1.nodeHeartbeat(true);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
am1.registerAppAttempt();
Assert.assertEquals("N/A", rm.getRMContext().getRMApps().get(
app1.getApplicationId()).getOriginalTrackingUrl());
AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl();
String newTrackingUrl = "hadoop.apache.org";
allocateRequest.setTrackingUrl(newTrackingUrl);
am1.allocate(allocateRequest);
Assert.assertEquals(newTrackingUrl, rm.getRMContext().getRMApps().get(
app1.getApplicationId()).getOriginalTrackingUrl());
// Send it again
am1.allocate(allocateRequest);
Assert.assertEquals(newTrackingUrl, rm.getRMContext().getRMApps().get(
app1.getApplicationId()).getOriginalTrackingUrl());
rm.stop();
}
} }

View File

@ -2697,6 +2697,51 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
rm2.stop(); rm2.stop();
} }
@Test(timeout = 20000)
public void testRMRestartAfterUpdateTrackingUrl() throws Exception {
MockRM rm = new MockRM(conf);
rm.start();
MemoryRMStateStore memStore = (MemoryRMStateStore) rm.getRMStateStore();
// Register node1
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * 1024);
RMApp app1 = rm.submitApp(2048);
nm1.nodeHeartbeat(true);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
am1.registerAppAttempt();
AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl();
String newTrackingUrl = "hadoop.apache.org";
allocateRequest.setTrackingUrl(newTrackingUrl);
am1.allocate(allocateRequest);
// Check in-memory and stored tracking url
Assert.assertEquals(newTrackingUrl, rm.getRMContext().getRMApps().get(
app1.getApplicationId()).getOriginalTrackingUrl());
Assert.assertEquals(newTrackingUrl, rm.getRMContext().getRMApps().get(
app1.getApplicationId()).getCurrentAppAttempt()
.getOriginalTrackingUrl());
Assert.assertEquals(newTrackingUrl, memStore.getState()
.getApplicationState().get(app1.getApplicationId())
.getAttempt(attempt1.getAppAttemptId()).getFinalTrackingUrl());
// Start new RM, should recover updated tracking url
MockRM rm2 = new MockRM(conf, memStore);
rm2.start();
Assert.assertEquals(newTrackingUrl, rm.getRMContext().getRMApps().get(
app1.getApplicationId()).getOriginalTrackingUrl());
Assert.assertEquals(newTrackingUrl, rm.getRMContext().getRMApps().get(
app1.getApplicationId()).getCurrentAppAttempt()
.getOriginalTrackingUrl());
rm.stop();
rm2.stop();
}
private Credentials getCreds() throws IOException { private Credentials getCreds() throws IOException {
Credentials ts = new Credentials(); Credentials ts = new Credentials();
DataOutputBuffer dob = new DataOutputBuffer(); DataOutputBuffer dob = new DataOutputBuffer();