From 0d6e1a2aab09eb1f367b7c2e2b799e0ee31d7370 Mon Sep 17 00:00:00 2001 From: Jonathan Hung Date: Mon, 30 Jul 2018 17:41:01 -0700 Subject: [PATCH] YARN-7974. Allow updating application tracking url after registration. Contributed by Jonathan Hung --- .../api/protocolrecords/AllocateRequest.java | 47 ++++++++++- .../src/main/proto/yarn_service_protos.proto | 1 + .../hadoop/yarn/client/api/AMRMClient.java | 11 +++ .../client/api/async/AMRMClientAsync.java | 11 +++ .../api/async/impl/AMRMClientAsyncImpl.java | 5 ++ .../yarn/client/api/impl/AMRMClientImpl.java | 12 +++ .../yarn/client/api/impl/TestAMRMClient.java | 77 +++++++++++++++++++ .../impl/pb/AllocateRequestPBImpl.java | 27 ++++++- .../resourcemanager/DefaultAMSProcessor.java | 2 +- .../rmapp/attempt/RMAppAttemptImpl.java | 20 +++++ .../event/RMAppAttemptStatusupdateEvent.java | 11 +++ .../TestApplicationMasterService.java | 34 ++++++++ .../server/resourcemanager/TestRMRestart.java | 45 +++++++++++ 13 files changed, 299 insertions(+), 4 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java index ae0891e89af..1b888343f3b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java @@ -71,7 +71,21 @@ public abstract class AllocateRequest { .releaseList(containersToBeReleased) .resourceBlacklistRequest(resourceBlacklistRequest).build(); } - + + @Public + @Unstable + public static AllocateRequest newInstance(int responseID, float appProgress, + List resourceAsk, + List containersToBeReleased, + ResourceBlacklistRequest resourceBlacklistRequest, + String trackingUrl) { + return AllocateRequest.newBuilder().responseId(responseID) + .progress(appProgress).askList(resourceAsk) + .releaseList(containersToBeReleased) + .resourceBlacklistRequest(resourceBlacklistRequest) + .trackingUrl(trackingUrl).build(); + } + @Public @Unstable public static AllocateRequest newInstance(int responseID, float appProgress, @@ -212,6 +226,22 @@ public abstract class AllocateRequest { public abstract void setUpdateRequests( List updateRequests); + /** + * Get the tracking url update for this heartbeat. + * @return tracking url to update this application with + */ + @Public + @Unstable + public abstract String getTrackingUrl(); + + /** + * Set the new tracking url for this application. + * @param trackingUrl the new tracking url + */ + @Public + @Unstable + public abstract void setTrackingUrl(String trackingUrl); + @Public @Unstable public static AllocateRequestBuilder newBuilder() { @@ -313,6 +343,19 @@ public abstract class AllocateRequest { return this; } + /** + * Set the trackingUrl of the request. + * @see AllocateRequest#setTrackingUrl(String) + * @param trackingUrl new tracking url + * @return {@link AllocateRequestBuilder} + */ + @Public + @Unstable + public AllocateRequestBuilder trackingUrl(String trackingUrl) { + allocateRequest.setTrackingUrl(trackingUrl); + return this; + } + /** * Return generated {@link AllocateRequest} object. * @return {@link AllocateRequest} @@ -323,4 +366,4 @@ public abstract class AllocateRequest { return allocateRequest; } } -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 8d03f5abf42..9ddccf922ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -89,6 +89,7 @@ message AllocateRequestProto { optional int32 response_id = 4; optional float progress = 5; repeated UpdateContainerRequestProto update_requests = 7; + optional string tracking_url = 11; } message NMTokenProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index 5ccbef06f7c..3dabf3aab24 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -701,6 +701,17 @@ public abstract class AMRMClient extends return this.timelineV2Client; } + /** + * Update application's tracking url on next heartbeat. + * + * @param trackingUrl new tracking url for this application + */ + @Public + @InterfaceStability.Unstable + public void updateTrackingUrl(String trackingUrl) { + // Unimplemented. + } + /** * Wait for check to return true for each 1000 ms. * See also {@link #waitFor(java.util.function.Supplier, int)} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java index 793ad79e08d..03211299b3f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java @@ -374,6 +374,17 @@ extends AbstractService { public abstract void updateBlacklist(List blacklistAdditions, List blacklistRemovals); + /** + * Update application's tracking url on next heartbeat. + * + * @param trackingUrl new tracking url for this application + */ + @Public + @Unstable + public void updateTrackingUrl(String trackingUrl) { + // Unimplemented. + } + /** * Wait for check to return true for each 1000 ms. * See also {@link #waitFor(java.util.function.Supplier, int)} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index 031cdecc915..36d9c1932f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -249,6 +249,11 @@ extends AMRMClientAsync { List blacklistRemovals) { client.updateBlacklist(blacklistAdditions, blacklistRemovals); } + + @Override + public void updateTrackingUrl(String trackingUrl) { + client.updateTrackingUrl(trackingUrl); + } private class HeartbeatThread extends Thread { public HeartbeatThread() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index a2c8cfc9203..4646a68078a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -95,6 +95,7 @@ public class AMRMClientImpl extends AMRMClient { protected String appHostName; protected int appHostPort; protected String appTrackingUrl; + protected String newTrackingUrl; protected ApplicationMasterProtocol rmClient; protected Resource clusterAvailableResources; @@ -281,6 +282,12 @@ public class AMRMClientImpl extends AMRMClient { .responseId(lastResponseId).progress(progressIndicator) .askList(askList).resourceBlacklistRequest(blacklistRequest) .releaseList(releaseList).updateRequests(updateList).build(); + + if (this.newTrackingUrl != null) { + allocateRequest.setTrackingUrl(this.newTrackingUrl); + this.appTrackingUrl = this.newTrackingUrl; + this.newTrackingUrl = null; + } // clear blacklistAdditions and blacklistRemovals before // unsynchronized part blacklistAdditions.clear(); @@ -932,6 +939,11 @@ public class AMRMClientImpl extends AMRMClient { } } + @Override + public synchronized void updateTrackingUrl(String trackingUrl) { + this.newTrackingUrl = trackingUrl; + } + private void updateAMRMToken(Token token) throws IOException { org.apache.hadoop.security.token.Token amrmToken = new org.apache.hadoop.security.token.Token(token diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index de1156c3a3c..6f7ed5093b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -20,10 +20,12 @@ package org.apache.hadoop.yarn.client.api.impl; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; @@ -88,6 +90,7 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.mockito.ArgumentCaptor; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.eclipse.jetty.util.log.Log; @@ -2052,4 +2055,78 @@ public class TestAMRMClient { } return result; } + + @Test(timeout = 60000) + public void testNoUpdateTrackingUrl() { + try { + AMRMClientImpl amClient = null; + amClient = new AMRMClientImpl<>(); + amClient.init(conf); + amClient.start(); + amClient.registerApplicationMaster("Host", 10000, ""); + + assertEquals("", amClient.appTrackingUrl); + + ApplicationMasterProtocol mockRM = mock(ApplicationMasterProtocol.class); + AllocateResponse mockResponse = mock(AllocateResponse.class); + when(mockRM.allocate(any(AllocateRequest.class))) + .thenReturn(mockResponse); + ApplicationMasterProtocol realRM = amClient.rmClient; + amClient.rmClient = mockRM; + // Do allocate without updated tracking url + amClient.allocate(0.1f); + ArgumentCaptor argument = + ArgumentCaptor.forClass(AllocateRequest.class); + verify(mockRM).allocate(argument.capture()); + assertNull(argument.getValue().getTrackingUrl()); + + amClient.rmClient = realRM; + amClient + .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, + null); + } catch (IOException | YarnException e) { + throw new AssertionError( + "testNoUpdateTrackingUrl unexpectedly threw exception: " + e); + } + } + + @Test(timeout = 60000) + public void testUpdateTrackingUrl() { + try { + AMRMClientImpl amClient = null; + amClient = new AMRMClientImpl<>(); + amClient.init(conf); + amClient.start(); + amClient.registerApplicationMaster("Host", 10000, ""); + + String trackingUrl = "hadoop.apache.org"; + assertEquals("", amClient.appTrackingUrl); + + ApplicationMasterProtocol mockRM = mock(ApplicationMasterProtocol.class); + AllocateResponse mockResponse = mock(AllocateResponse.class); + when(mockRM.allocate(any(AllocateRequest.class))) + .thenReturn(mockResponse); + ApplicationMasterProtocol realRM = amClient.rmClient; + amClient.rmClient = mockRM; + // Do allocate with updated tracking url + amClient.updateTrackingUrl(trackingUrl); + assertEquals(trackingUrl, amClient.newTrackingUrl); + assertEquals("", amClient.appTrackingUrl); + amClient.allocate(0.1f); + assertNull(amClient.newTrackingUrl); + assertEquals(trackingUrl, amClient.appTrackingUrl); + ArgumentCaptor argument + = ArgumentCaptor.forClass(AllocateRequest.class); + verify(mockRM).allocate(argument.capture()); + assertEquals(trackingUrl, argument.getValue().getTrackingUrl()); + + amClient.rmClient = realRM; + amClient + .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, + null); + } catch (IOException | YarnException e) { + throw new AssertionError( + "testUpdateTrackingUrl unexpectedly threw exception: " + e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java index 0f0f5710332..5ffbcec5401 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java @@ -54,6 +54,7 @@ public class AllocateRequestPBImpl extends AllocateRequest { private List release = null; private List updateRequests = null; private ResourceBlacklistRequest blacklistRequest = null; + private String trackingUrl = null; public AllocateRequestPBImpl() { builder = AllocateRequestProto.newBuilder(); @@ -104,6 +105,9 @@ public class AllocateRequestPBImpl extends AllocateRequest { if (this.blacklistRequest != null) { builder.setBlacklistRequest(convertToProtoFormat(this.blacklistRequest)); } + if (this.trackingUrl != null) { + builder.setTrackingUrl(this.trackingUrl); + } } private void mergeLocalToProto() { @@ -324,7 +328,28 @@ public class AllocateRequestPBImpl extends AllocateRequest { this.release.add(convertFromProtoFormat(c)); } } - + + @Override + public String getTrackingUrl() { + AllocateRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.trackingUrl != null) { + return this.trackingUrl; + } + if (p.hasTrackingUrl()) { + this.trackingUrl = p.getTrackingUrl(); + } + return this.trackingUrl; + } + + @Override + public void setTrackingUrl(String trackingUrl) { + maybeInitBuilder(); + if (trackingUrl == null) { + builder.clearTrackingUrl(); + } + this.trackingUrl = trackingUrl; + } + private void addReleasesToProto() { maybeInitBuilder(); builder.clearRelease(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java index 68743383770..99d38717e38 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java @@ -365,7 +365,7 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor { // Send the status update to the appAttempt. getRmContext().getDispatcher().getEventHandler().handle( new RMAppAttemptStatusupdateEvent(appAttemptId, request - .getProgress())); + .getProgress(), request.getTrackingUrl())); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 57837f99adb..2f5b0e1c0b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -1806,6 +1806,26 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { // Update progress appAttempt.progress = statusUpdateEvent.getProgress(); + // Update tracking url if changed and save it to state store + String newTrackingUrl = statusUpdateEvent.getTrackingUrl(); + if (newTrackingUrl != null && + !newTrackingUrl.equals(appAttempt.originalTrackingUrl)) { + appAttempt.originalTrackingUrl = newTrackingUrl; + ApplicationAttemptStateData attemptState = ApplicationAttemptStateData + .newInstance(appAttempt.applicationAttemptId, + appAttempt.getMasterContainer(), + appAttempt.rmContext.getStateStore() + .getCredentialsFromAppAttempt(appAttempt), + appAttempt.startTime, appAttempt.recoveredFinalState, + newTrackingUrl, appAttempt.getDiagnostics(), null, + ContainerExitStatus.INVALID, appAttempt.getFinishTime(), + appAttempt.attemptMetrics.getAggregateAppResourceUsage() + .getResourceUsageSecondsMap(), + appAttempt.attemptMetrics.getPreemptedResourceSecondsMap()); + appAttempt.rmContext.getStateStore() + .updateApplicationAttemptState(attemptState); + } + // Ping to AMLivelinessMonitor appAttempt.rmContext.getAMLivelinessMonitor().receivedPing( statusUpdateEvent.getApplicationAttemptId()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptStatusupdateEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptStatusupdateEvent.java index b1b63b14efd..1b7442d8a3a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptStatusupdateEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptStatusupdateEvent.java @@ -25,15 +25,26 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE public class RMAppAttemptStatusupdateEvent extends RMAppAttemptEvent { private final float progress; + private final String trackingUrl; public RMAppAttemptStatusupdateEvent(ApplicationAttemptId appAttemptId, float progress) { + this(appAttemptId, progress, null); + } + + public RMAppAttemptStatusupdateEvent(ApplicationAttemptId appAttemptId, + float progress, String trackingUrl) { super(appAttemptId, RMAppAttemptEventType.STATUS_UPDATE); this.progress = progress; + this.trackingUrl = trackingUrl; } public float getProgress() { return this.progress; } + public String getTrackingUrl() { + return this.trackingUrl; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index 421ddbc29b6..aa0085ba648 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -675,4 +675,38 @@ public class TestApplicationMasterService { Assert.fail("Cannot find RMContainer"); } } + + @Test(timeout = 300000) + public void testUpdateTrackingUrl() throws Exception { + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + rm.start(); + + // Register node1 + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB); + + RMApp app1 = rm.submitApp(2048); + + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + Assert.assertEquals("N/A", rm.getRMContext().getRMApps().get( + app1.getApplicationId()).getOriginalTrackingUrl()); + + AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl(); + String newTrackingUrl = "hadoop.apache.org"; + allocateRequest.setTrackingUrl(newTrackingUrl); + + am1.allocate(allocateRequest); + Assert.assertEquals(newTrackingUrl, rm.getRMContext().getRMApps().get( + app1.getApplicationId()).getOriginalTrackingUrl()); + + // Send it again + am1.allocate(allocateRequest); + Assert.assertEquals(newTrackingUrl, rm.getRMContext().getRMApps().get( + app1.getApplicationId()).getOriginalTrackingUrl()); + rm.stop(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index f0f51f32b72..414c4fc71ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -2697,6 +2697,51 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { rm2.stop(); } + @Test(timeout = 20000) + public void testRMRestartAfterUpdateTrackingUrl() throws Exception { + MockRM rm = new MockRM(conf); + rm.start(); + + MemoryRMStateStore memStore = (MemoryRMStateStore) rm.getRMStateStore(); + + // Register node1 + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * 1024); + + RMApp app1 = rm.submitApp(2048); + + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl(); + String newTrackingUrl = "hadoop.apache.org"; + allocateRequest.setTrackingUrl(newTrackingUrl); + + am1.allocate(allocateRequest); + // Check in-memory and stored tracking url + Assert.assertEquals(newTrackingUrl, rm.getRMContext().getRMApps().get( + app1.getApplicationId()).getOriginalTrackingUrl()); + Assert.assertEquals(newTrackingUrl, rm.getRMContext().getRMApps().get( + app1.getApplicationId()).getCurrentAppAttempt() + .getOriginalTrackingUrl()); + Assert.assertEquals(newTrackingUrl, memStore.getState() + .getApplicationState().get(app1.getApplicationId()) + .getAttempt(attempt1.getAppAttemptId()).getFinalTrackingUrl()); + + // Start new RM, should recover updated tracking url + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + Assert.assertEquals(newTrackingUrl, rm.getRMContext().getRMApps().get( + app1.getApplicationId()).getOriginalTrackingUrl()); + Assert.assertEquals(newTrackingUrl, rm.getRMContext().getRMApps().get( + app1.getApplicationId()).getCurrentAppAttempt() + .getOriginalTrackingUrl()); + + rm.stop(); + rm2.stop(); + } + private Credentials getCreds() throws IOException { Credentials ts = new Credentials(); DataOutputBuffer dob = new DataOutputBuffer();