YARN-1879. Marked Idempotent/AtMostOnce annotations to ApplicationMasterProtocol for RM fail over. Contributed by Tsuyoshi OZAWA
(cherry picked from commit c3de2412eb
)
This commit is contained in:
parent
9183dadb73
commit
4d03915be9
|
@ -294,6 +294,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
YARN-2621. Simplify the output when the user doesn't have the access for
|
YARN-2621. Simplify the output when the user doesn't have the access for
|
||||||
getDomain(s). (Zhijie Shen via jianhe)
|
getDomain(s). (Zhijie Shen via jianhe)
|
||||||
|
|
||||||
|
YARN-1879. Marked Idempotent/AtMostOnce annotations to ApplicationMasterProtocol
|
||||||
|
for RM fail over. (Tsuyoshi OZAWA via jianhe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.io.IOException;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||||
import org.apache.hadoop.io.retry.AtMostOnce;
|
import org.apache.hadoop.io.retry.AtMostOnce;
|
||||||
|
import org.apache.hadoop.io.retry.Idempotent;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||||
|
@ -80,6 +81,7 @@ public interface ApplicationMasterProtocol {
|
||||||
*/
|
*/
|
||||||
@Public
|
@Public
|
||||||
@Stable
|
@Stable
|
||||||
|
@Idempotent
|
||||||
public RegisterApplicationMasterResponse registerApplicationMaster(
|
public RegisterApplicationMasterResponse registerApplicationMaster(
|
||||||
RegisterApplicationMasterRequest request)
|
RegisterApplicationMasterRequest request)
|
||||||
throws YarnException, IOException;
|
throws YarnException, IOException;
|
||||||
|
@ -104,6 +106,7 @@ public interface ApplicationMasterProtocol {
|
||||||
*/
|
*/
|
||||||
@Public
|
@Public
|
||||||
@Stable
|
@Stable
|
||||||
|
@AtMostOnce
|
||||||
public FinishApplicationMasterResponse finishApplicationMaster(
|
public FinishApplicationMasterResponse finishApplicationMaster(
|
||||||
FinishApplicationMasterRequest request)
|
FinishApplicationMasterRequest request)
|
||||||
throws YarnException, IOException;
|
throws YarnException, IOException;
|
||||||
|
|
|
@ -18,6 +18,14 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.client;
|
package org.apache.hadoop.yarn.client;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
@ -122,7 +130,23 @@ import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
|
|
||||||
public abstract class ProtocolHATestBase extends ClientBaseWithFixes{
|
/**
|
||||||
|
* Test Base for ResourceManager's Protocol on HA.
|
||||||
|
*
|
||||||
|
* Limited scope:
|
||||||
|
* For all the test cases, we only test whether the method will be re-entered
|
||||||
|
* when failover happens. Does not cover the entire logic of test.
|
||||||
|
*
|
||||||
|
* Test strategy:
|
||||||
|
* Create a separate failover thread with a trigger flag,
|
||||||
|
* override all APIs that are added trigger flag.
|
||||||
|
* When the APIs are called, we will set trigger flag as true to kick off
|
||||||
|
* the failover. So We can make sure the failover happens during process
|
||||||
|
* of the method. If this API is marked as @Idempotent or @AtMostOnce,
|
||||||
|
* the test cases will pass; otherwise, they will throw the exception.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
|
||||||
protected static final HAServiceProtocol.StateChangeRequestInfo req =
|
protected static final HAServiceProtocol.StateChangeRequestInfo req =
|
||||||
new HAServiceProtocol.StateChangeRequestInfo(
|
new HAServiceProtocol.StateChangeRequestInfo(
|
||||||
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
|
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
|
||||||
|
@ -760,6 +784,43 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes{
|
||||||
return createFakeAllocateResponse();
|
return createFakeAllocateResponse();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RegisterApplicationMasterResponse registerApplicationMaster(
|
||||||
|
RegisterApplicationMasterRequest request) throws YarnException,
|
||||||
|
IOException {
|
||||||
|
resetStartFailoverFlag(true);
|
||||||
|
// make sure failover has been triggered
|
||||||
|
Assert.assertTrue(waittingForFailOver());
|
||||||
|
return createFakeRegisterApplicationMasterResponse();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FinishApplicationMasterResponse finishApplicationMaster(
|
||||||
|
FinishApplicationMasterRequest request) throws YarnException,
|
||||||
|
IOException {
|
||||||
|
resetStartFailoverFlag(true);
|
||||||
|
// make sure failover has been triggered
|
||||||
|
Assert.assertTrue(waittingForFailOver());
|
||||||
|
return createFakeFinishApplicationMasterResponse();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public RegisterApplicationMasterResponse
|
||||||
|
createFakeRegisterApplicationMasterResponse() {
|
||||||
|
Resource minCapability = Resource.newInstance(2048, 2);
|
||||||
|
Resource maxCapability = Resource.newInstance(4096, 4);
|
||||||
|
Map<ApplicationAccessType, String> acls =
|
||||||
|
new HashMap<ApplicationAccessType, String>();
|
||||||
|
acls.put(ApplicationAccessType.MODIFY_APP, "*");
|
||||||
|
ByteBuffer key = ByteBuffer.wrap("fake_key".getBytes());
|
||||||
|
return RegisterApplicationMasterResponse.newInstance(minCapability,
|
||||||
|
maxCapability, acls, key, new ArrayList<Container>(), "root",
|
||||||
|
new ArrayList<NMToken>());
|
||||||
|
}
|
||||||
|
|
||||||
|
public FinishApplicationMasterResponse
|
||||||
|
createFakeFinishApplicationMasterResponse() {
|
||||||
|
return FinishApplicationMasterResponse.newInstance(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public AllocateResponse createFakeAllocateResponse() {
|
public AllocateResponse createFakeAllocateResponse() {
|
||||||
|
@ -770,4 +831,5 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes{
|
||||||
null, new ArrayList<NMToken>());
|
null, new ArrayList<NMToken>());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,92 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.client;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
|
|
||||||
import org.junit.Assert;
|
|
||||||
|
|
||||||
import org.apache.hadoop.io.Text;
|
|
||||||
import org.apache.hadoop.ipc.RPC;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
|
||||||
import org.apache.hadoop.security.token.Token;
|
|
||||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
||||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
|
|
||||||
public class TestApplicationMasterServiceOnHA extends ProtocolHATestBase{
|
|
||||||
private ApplicationMasterProtocol amClient;
|
|
||||||
private ApplicationAttemptId attemptId ;
|
|
||||||
RMAppAttempt appAttempt;
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void initiate() throws Exception {
|
|
||||||
startHACluster(0, false, false, true);
|
|
||||||
attemptId = this.cluster.createFakeApplicationAttemptId();
|
|
||||||
amClient = ClientRMProxy
|
|
||||||
.createRMProxy(this.conf, ApplicationMasterProtocol.class);
|
|
||||||
|
|
||||||
Token<AMRMTokenIdentifier> appToken =
|
|
||||||
this.cluster.getResourceManager().getRMContext()
|
|
||||||
.getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId);
|
|
||||||
appToken.setService(ClientRMProxy.getAMRMTokenService(conf));
|
|
||||||
UserGroupInformation.setLoginUser(UserGroupInformation
|
|
||||||
.createRemoteUser(UserGroupInformation.getCurrentUser()
|
|
||||||
.getUserName()));
|
|
||||||
UserGroupInformation.getCurrentUser().addToken(appToken);
|
|
||||||
syncToken(appToken);
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
|
||||||
public void shutDown() {
|
|
||||||
if(this.amClient != null) {
|
|
||||||
RPC.stopProxy(this.amClient);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(timeout = 15000)
|
|
||||||
public void testAllocateOnHA() throws YarnException, IOException {
|
|
||||||
AllocateRequest request = AllocateRequest.newInstance(0, 50f,
|
|
||||||
new ArrayList<ResourceRequest>(),
|
|
||||||
new ArrayList<ContainerId>(),
|
|
||||||
ResourceBlacklistRequest.newInstance(new ArrayList<String>(),
|
|
||||||
new ArrayList<String>()));
|
|
||||||
AllocateResponse response = amClient.allocate(request);
|
|
||||||
Assert.assertEquals(response, this.cluster.createFakeAllocateResponse());
|
|
||||||
}
|
|
||||||
|
|
||||||
private void syncToken(Token<AMRMTokenIdentifier> token) throws IOException {
|
|
||||||
for (int i = 0; i < this.cluster.getNumOfResourceManager(); i++) {
|
|
||||||
this.cluster.getResourceManager(i).getRMContext()
|
|
||||||
.getAMRMTokenSecretManager().addPersistedPassword(token);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -343,6 +343,15 @@ public class ApplicationMasterService extends AbstractService implements
|
||||||
authorizeRequest().getApplicationAttemptId();
|
authorizeRequest().getApplicationAttemptId();
|
||||||
ApplicationId appId = applicationAttemptId.getApplicationId();
|
ApplicationId appId = applicationAttemptId.getApplicationId();
|
||||||
|
|
||||||
|
RMApp rmApp =
|
||||||
|
rmContext.getRMApps().get(applicationAttemptId.getApplicationId());
|
||||||
|
// checking whether the app exits in RMStateStore at first not to throw
|
||||||
|
// ApplicationDoesNotExistInCacheException before and after
|
||||||
|
// RM work-preserving restart.
|
||||||
|
if (rmApp.isAppFinalStateStored()) {
|
||||||
|
return FinishApplicationMasterResponse.newInstance(true);
|
||||||
|
}
|
||||||
|
|
||||||
AllocateResponseLock lock = responseMap.get(applicationAttemptId);
|
AllocateResponseLock lock = responseMap.get(applicationAttemptId);
|
||||||
if (lock == null) {
|
if (lock == null) {
|
||||||
throwApplicationDoesNotExistInCacheException(applicationAttemptId);
|
throwApplicationDoesNotExistInCacheException(applicationAttemptId);
|
||||||
|
@ -366,13 +375,6 @@ public class ApplicationMasterService extends AbstractService implements
|
||||||
|
|
||||||
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
|
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
|
||||||
|
|
||||||
RMApp rmApp =
|
|
||||||
rmContext.getRMApps().get(appId);
|
|
||||||
|
|
||||||
if (rmApp.isAppFinalStateStored()) {
|
|
||||||
return FinishApplicationMasterResponse.newInstance(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
rmContext.getDispatcher().getEventHandler().handle(
|
rmContext.getDispatcher().getEventHandler().handle(
|
||||||
new RMAppAttemptUnregistrationEvent(applicationAttemptId, request
|
new RMAppAttemptUnregistrationEvent(applicationAttemptId, request
|
||||||
.getTrackingUrl(), request.getFinalApplicationStatus(), request
|
.getTrackingUrl(), request.getFinalApplicationStatus(), request
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
@ -51,6 +52,7 @@ public class MockAM {
|
||||||
private final ApplicationAttemptId attemptId;
|
private final ApplicationAttemptId attemptId;
|
||||||
private RMContext context;
|
private RMContext context;
|
||||||
private ApplicationMasterProtocol amRMProtocol;
|
private ApplicationMasterProtocol amRMProtocol;
|
||||||
|
private UserGroupInformation ugi;
|
||||||
|
|
||||||
private final List<ResourceRequest> requests = new ArrayList<ResourceRequest>();
|
private final List<ResourceRequest> requests = new ArrayList<ResourceRequest>();
|
||||||
private final List<ContainerId> releases = new ArrayList<ContainerId>();
|
private final List<ContainerId> releases = new ArrayList<ContainerId>();
|
||||||
|
@ -101,15 +103,18 @@ public class MockAM {
|
||||||
req.setHost("");
|
req.setHost("");
|
||||||
req.setRpcPort(1);
|
req.setRpcPort(1);
|
||||||
req.setTrackingUrl("");
|
req.setTrackingUrl("");
|
||||||
UserGroupInformation ugi =
|
if (ugi == null) {
|
||||||
UserGroupInformation.createRemoteUser(attemptId.toString());
|
ugi = UserGroupInformation.createRemoteUser(
|
||||||
Token<AMRMTokenIdentifier> token =
|
attemptId.toString());
|
||||||
context.getRMApps().get(attemptId.getApplicationId())
|
Token<AMRMTokenIdentifier> token =
|
||||||
.getRMAppAttempt(attemptId).getAMRMToken();
|
context.getRMApps().get(attemptId.getApplicationId())
|
||||||
ugi.addTokenIdentifier(token.decodeIdentifier());
|
.getRMAppAttempt(attemptId).getAMRMToken();
|
||||||
|
ugi.addTokenIdentifier(token.decodeIdentifier());
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
return ugi
|
return ugi
|
||||||
.doAs(new PrivilegedExceptionAction<RegisterApplicationMasterResponse>() {
|
.doAs(
|
||||||
|
new PrivilegedExceptionAction<RegisterApplicationMasterResponse>() {
|
||||||
@Override
|
@Override
|
||||||
public RegisterApplicationMasterResponse run() throws Exception {
|
public RegisterApplicationMasterResponse run() throws Exception {
|
||||||
return amRMProtocol.registerApplicationMaster(req);
|
return amRMProtocol.registerApplicationMaster(req);
|
||||||
|
@ -245,10 +250,15 @@ public class MockAM {
|
||||||
|
|
||||||
public void unregisterAppAttempt() throws Exception {
|
public void unregisterAppAttempt() throws Exception {
|
||||||
waitForState(RMAppAttemptState.RUNNING);
|
waitForState(RMAppAttemptState.RUNNING);
|
||||||
|
unregisterAppAttempt(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void unregisterAppAttempt(boolean waitForStateRunning)
|
||||||
|
throws Exception {
|
||||||
final FinishApplicationMasterRequest req =
|
final FinishApplicationMasterRequest req =
|
||||||
FinishApplicationMasterRequest.newInstance(
|
FinishApplicationMasterRequest.newInstance(
|
||||||
FinalApplicationStatus.SUCCEEDED, "", "");
|
FinalApplicationStatus.SUCCEEDED, "", "");
|
||||||
unregisterAppAttempt(req,true);
|
unregisterAppAttempt(req, waitForStateRunning);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void unregisterAppAttempt(final FinishApplicationMasterRequest req,
|
public void unregisterAppAttempt(final FinishApplicationMasterRequest req,
|
||||||
|
@ -256,19 +266,25 @@ public class MockAM {
|
||||||
if (waitForStateRunning) {
|
if (waitForStateRunning) {
|
||||||
waitForState(RMAppAttemptState.RUNNING);
|
waitForState(RMAppAttemptState.RUNNING);
|
||||||
}
|
}
|
||||||
UserGroupInformation ugi =
|
if (ugi == null) {
|
||||||
UserGroupInformation.createRemoteUser(attemptId.toString());
|
ugi = UserGroupInformation.createRemoteUser(attemptId.toString());
|
||||||
Token<AMRMTokenIdentifier> token =
|
Token<AMRMTokenIdentifier> token =
|
||||||
context.getRMApps().get(attemptId.getApplicationId())
|
context.getRMApps()
|
||||||
.getRMAppAttempt(attemptId).getAMRMToken();
|
.get(attemptId.getApplicationId())
|
||||||
ugi.addTokenIdentifier(token.decodeIdentifier());
|
.getRMAppAttempt(attemptId).getAMRMToken();
|
||||||
ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
ugi.addTokenIdentifier(token.decodeIdentifier());
|
||||||
@Override
|
}
|
||||||
public Object run() throws Exception {
|
try {
|
||||||
amRMProtocol.finishApplicationMaster(req);
|
ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
||||||
return null;
|
@Override
|
||||||
}
|
public Object run() throws Exception {
|
||||||
});
|
amRMProtocol.finishApplicationMaster(req);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (UndeclaredThrowableException e) {
|
||||||
|
throw (Exception) e.getCause();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public ApplicationAttemptId getApplicationAttemptId() {
|
public ApplicationAttemptId getApplicationAttemptId() {
|
||||||
|
|
|
@ -23,9 +23,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
|
||||||
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||||
import org.junit.Assert;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -42,15 +40,19 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.*;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.*;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
|
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.Assert;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.ArrayList;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import static java.lang.Thread.sleep;
|
import static java.lang.Thread.sleep;
|
||||||
import static org.mockito.Matchers.any;
|
|
||||||
|
|
||||||
public class TestApplicationMasterService {
|
public class TestApplicationMasterService {
|
||||||
private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
|
private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
|
||||||
|
@ -240,25 +242,23 @@ public class TestApplicationMasterService {
|
||||||
FinishApplicationMasterRequest req =
|
FinishApplicationMasterRequest req =
|
||||||
FinishApplicationMasterRequest.newInstance(
|
FinishApplicationMasterRequest.newInstance(
|
||||||
FinalApplicationStatus.FAILED, "", "");
|
FinalApplicationStatus.FAILED, "", "");
|
||||||
Throwable cause = null;
|
|
||||||
try {
|
try {
|
||||||
am1.unregisterAppAttempt(req, false);
|
am1.unregisterAppAttempt(req, false);
|
||||||
|
Assert.fail("ApplicationMasterNotRegisteredException should be thrown");
|
||||||
|
} catch (ApplicationMasterNotRegisteredException e) {
|
||||||
|
Assert.assertNotNull(e);
|
||||||
|
Assert.assertNotNull(e.getMessage());
|
||||||
|
Assert.assertTrue(e.getMessage().contains(
|
||||||
|
"Application Master is trying to unregister before registering for:"
|
||||||
|
));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
cause = e.getCause();
|
Assert.fail("ApplicationMasterNotRegisteredException should be thrown");
|
||||||
}
|
}
|
||||||
Assert.assertNotNull(cause);
|
|
||||||
Assert
|
|
||||||
.assertTrue(cause instanceof ApplicationMasterNotRegisteredException);
|
|
||||||
Assert.assertNotNull(cause.getMessage());
|
|
||||||
Assert
|
|
||||||
.assertTrue(cause
|
|
||||||
.getMessage()
|
|
||||||
.contains(
|
|
||||||
"Application Master is trying to unregister before registering for:"));
|
|
||||||
|
|
||||||
am1.registerAppAttempt();
|
am1.registerAppAttempt();
|
||||||
|
|
||||||
am1.unregisterAppAttempt(req, false);
|
am1.unregisterAppAttempt(req, false);
|
||||||
|
am1.waitForState(RMAppAttemptState.FINISHING);
|
||||||
} finally {
|
} finally {
|
||||||
if (rm != null) {
|
if (rm != null) {
|
||||||
rm.stop();
|
rm.stop();
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
|
@ -668,6 +670,9 @@ public class TestWorkPreservingRMRestart {
|
||||||
// create app and launch the AM
|
// create app and launch the AM
|
||||||
RMApp app0 = rm1.submitApp(200);
|
RMApp app0 = rm1.submitApp(200);
|
||||||
MockAM am0 = MockRM.launchAM(app0, rm1, nm1);
|
MockAM am0 = MockRM.launchAM(app0, rm1, nm1);
|
||||||
|
// Issuing registerAppAttempt() before and after RM restart to confirm
|
||||||
|
// registerApplicationMaster() is idempotent.
|
||||||
|
am0.registerAppAttempt();
|
||||||
|
|
||||||
// start new RM
|
// start new RM
|
||||||
rm2 = new MockRM(conf, memStore);
|
rm2 = new MockRM(conf, memStore);
|
||||||
|
@ -676,6 +681,7 @@ public class TestWorkPreservingRMRestart {
|
||||||
rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED);
|
rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED);
|
||||||
|
|
||||||
am0.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
|
am0.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
|
||||||
|
// retry registerApplicationMaster() after RM restart.
|
||||||
am0.registerAppAttempt(true);
|
am0.registerAppAttempt(true);
|
||||||
|
|
||||||
rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
|
rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
|
||||||
|
@ -897,4 +903,44 @@ public class TestWorkPreservingRMRestart {
|
||||||
Thread.sleep(500);
|
Thread.sleep(500);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Testing to confirm that retried finishApplicationMaster() doesn't throw
|
||||||
|
* InvalidApplicationMasterRequest before and after RM restart.
|
||||||
|
*/
|
||||||
|
@Test (timeout = 20000)
|
||||||
|
public void testRetriedFinishApplicationMasterRequest()
|
||||||
|
throws Exception {
|
||||||
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||||
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
|
memStore.init(conf);
|
||||||
|
|
||||||
|
// start RM
|
||||||
|
rm1 = new MockRM(conf, memStore);
|
||||||
|
rm1.start();
|
||||||
|
MockNM nm1 =
|
||||||
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||||
|
nm1.registerNode();
|
||||||
|
|
||||||
|
// create app and launch the AM
|
||||||
|
RMApp app0 = rm1.submitApp(200);
|
||||||
|
MockAM am0 = MockRM.launchAM(app0, rm1, nm1);
|
||||||
|
|
||||||
|
am0.registerAppAttempt();
|
||||||
|
|
||||||
|
// Emulating following a scenario:
|
||||||
|
// RM1 saves the app in RMStateStore and then crashes,
|
||||||
|
// FinishApplicationMasterResponse#isRegistered still return false,
|
||||||
|
// so AM still retry the 2nd RM
|
||||||
|
MockRM.finishAMAndVerifyAppState(app0, rm1, nm1, am0);
|
||||||
|
|
||||||
|
|
||||||
|
// start new RM
|
||||||
|
rm2 = new MockRM(conf, memStore);
|
||||||
|
rm2.start();
|
||||||
|
|
||||||
|
am0.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
|
||||||
|
am0.unregisterAppAttempt(false);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue