YARN-11283. Fix Typo of NodeManager amrmproxy. (#4899)

This commit is contained in:
slfan1989 2022-09-20 04:16:25 +08:00 committed by GitHub
parent 342c4856b8
commit f52b900a5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 88 additions and 88 deletions

View File

@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.Context;
/**
* Interface that can be used by the intercepter plugins to get the information
* Interface that can be used by the interceptor plugins to get the information
* about one application.
*
*/

View File

@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
/**
* Encapsulates the information about one application that is needed by the
* request intercepters.
* request interceptors.
*
*/
public class AMRMProxyApplicationContextImpl implements

View File

@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
/**
* Implements the RequestInterceptor interface and provides common functionality
* which can can be used and/or extended by other concrete intercepter classes.
* which can can be used and/or extended by other concrete interceptor classes.
*
*/
public abstract class AbstractRequestInterceptor implements

View File

@ -104,7 +104,7 @@ import org.apache.hadoop.util.Preconditions;
* Extends the AbstractRequestInterceptor and provides an implementation for
* federation of YARN RM and scaling an application across multiple YARN
* sub-clusters. All the federation specific implementation is encapsulated in
* this class. This is always the last intercepter in the chain.
* this class. This is always the last interceptor in the chain.
*/
public class FederationInterceptor extends AbstractRequestInterceptor {
private static final Logger LOG =
@ -654,7 +654,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
if (AMRMClientUtils.getNextResponseId(
request.getResponseId()) == this.lastAllocateResponse
.getResponseId()) {
// heartbeat one step old, simply return lastReponse
// heartbeat one step old, simply return lastResponse
return this.lastAllocateResponse;
} else if (request.getResponseId() != this.lastAllocateResponse
.getResponseId()) {
@ -1572,7 +1572,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
/**
* Check to see if an AllocateRequest exists in the Map for the specified sub
* cluster. If not found, create a new one, copy the value of responseId and
* progress from the orignialAMRequest, save it in the specified Map and
* progress from the originalAMRequest, save it in the specified Map and
* return the new instance. If found, just return the old instance.
*/
private static AllocateRequest findOrCreateAllocateRequestForSubCluster(

View File

@ -24,14 +24,14 @@ import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
/**
* Defines the contract to be implemented by the request intercepter classes,
* Defines the contract to be implemented by the request interceptor classes,
* that can be used to intercept and inspect messages sent from the application
* master to the resource manager.
*/
public interface RequestInterceptor extends DistributedSchedulingAMProtocol,
Configurable {
/**
* This method is called for initializing the intercepter. This is guaranteed
* This method is called for initializing the interceptor. This is guaranteed
* to be called only once in the lifetime of this instance.
*
* @param ctx AMRMProxy application context
@ -39,42 +39,42 @@ public interface RequestInterceptor extends DistributedSchedulingAMProtocol,
void init(AMRMProxyApplicationContext ctx);
/**
* Recover intercepter state when NM recovery is enabled. AMRMProxy will
* Recover interceptor state when NM recovery is enabled. AMRMProxy will
* recover the data map into
* AMRMProxyApplicationContext.getRecoveredDataMap(). All intercepters should
* AMRMProxyApplicationContext.getRecoveredDataMap(). All interceptors should
* recover state from it.
*
* For example, registerRequest has to be saved by the last intercepter (i.e.
* For example, registerRequest has to be saved by the last interceptor (i.e.
* the one that actually connects to RM), in order to re-register when RM
* fails over.
*
* @param recoveredDataMap states for all intercepters recovered from NMSS
* @param recoveredDataMap states for all interceptors recovered from NMSS
*/
void recover(Map<String, byte[]> recoveredDataMap);
/**
* This method is called to release the resources held by the intercepter.
* This method is called to release the resources held by the interceptor.
* This will be called when the application pipeline is being destroyed. The
* concrete implementations should dispose the resources and forward the
* request to the next intercepter, if any.
* request to the next interceptor, if any.
*/
void shutdown();
/**
* Sets the next intercepter in the pipeline. The concrete implementation of
* Sets the next interceptor in the pipeline. The concrete implementation of
* this interface should always pass the request to the nextInterceptor after
* inspecting the message. The last intercepter in the chain is responsible to
* inspecting the message. The last interceptor in the chain is responsible to
* send the messages to the resource manager service and so the last
* intercepter will not receive this method call.
* interceptor will not receive this method call.
*
* @param nextInterceptor the next intercepter to set
* @param nextInterceptor the next interceptor to set
*/
void setNextInterceptor(RequestInterceptor nextInterceptor);
/**
* Returns the next intercepter in the chain.
* Returns the next interceptor in the chain.
*
* @return the next intercepter in the chain
* @return the next interceptor in the chain
*/
RequestInterceptor getNextInterceptor();

View File

@ -134,7 +134,7 @@ public abstract class BaseAMRMProxyTest {
String mockPassThroughInterceptorClass =
PassThroughRequestInterceptor.class.getName();
// Create a request intercepter pipeline for testing. The last one in the
// Create a request interceptor pipeline for testing. The last one in the
// chain will call the mock resource manager. The others in the chain will
// simply forward it to the next one in the chain
config.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
@ -191,7 +191,7 @@ public abstract class BaseAMRMProxyTest {
return new NMContext(null, null, null, null, stateStore, false, this.conf);
}
// A utility method for intercepter recover unit test
// A utility method for interceptor recover unit test
protected Map<String, byte[]> recoverDataMapForAppAttempt(
NMStateStoreService nmStateStore, ApplicationAttemptId attemptId)
throws IOException {
@ -341,7 +341,7 @@ public abstract class BaseAMRMProxyTest {
Assert.assertEquals(Integer.toString(index), response
.getResponse().getQueue());
LOG.info("Sucessfully registered application master with test context: "
LOG.info("Successfully registered application master with test context: "
+ testContext);
} catch (Throwable ex) {
response = null;
@ -424,7 +424,7 @@ public abstract class BaseAMRMProxyTest {
testContext);
Assert.assertNotNull(response.getResponse());
LOG.info("Sucessfully finished application master with test contexts: "
LOG.info("Successfully finished application master with test contexts: "
+ testContext);
} catch (Throwable ex) {
response = null;

View File

@ -29,8 +29,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* Mock intercepter that does not do anything other than forwarding it to the
* next intercepter in the chain
* Mock interceptor that does not do anything other than forwarding it to the
* next interceptor in the chain
*
*/
public class PassThroughRequestInterceptor extends

View File

@ -82,11 +82,11 @@ public class TestAMRMProxyMetrics extends BaseAMRMProxyTest {
AllocateResponse allocateResponse = allocate(testAppId);
Assert.assertNotNull(allocateResponse);
FinishApplicationMasterResponse finshResponse =
FinishApplicationMasterResponse finishResponse =
finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
Assert.assertNotNull(finshResponse);
Assert.assertEquals(true, finshResponse.getIsUnregistered());
Assert.assertNotNull(finishResponse);
Assert.assertEquals(true, finishResponse.getIsUnregistered());
Assert.assertEquals(failedAppStartRequests, metrics.getFailedAppStartRequests());
Assert.assertEquals(failedRegisterAMRequests, metrics.getFailedRegisterAMRequests());
@ -126,10 +126,10 @@ public class TestAMRMProxyMetrics extends BaseAMRMProxyTest {
Assert
.assertEquals(Integer.toString(testAppId), registerResponse.getQueue());
FinishApplicationMasterResponse finshResponse =
FinishApplicationMasterResponse finishResponse =
finishApplicationMaster(testAppId, FinalApplicationStatus.FAILED);
Assert.assertNotNull(finshResponse);
Assert.assertNotNull(finishResponse);
try {
// Try to finish an application master that is already finished.

View File

@ -128,7 +128,7 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
getAMRMProxyService().getPipelines();
ApplicationId id = getApplicationId(testAppId);
Assert.assertTrue(
"The interceptor pipeline should be removed if initializtion fails",
"The interceptor pipeline should be removed if initialization fails",
pipelines.get(id) == null);
}
}
@ -140,7 +140,7 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
* @throws Exception
*/
@Test
public void testRegisterMulitpleApplicationMasters() throws Exception {
public void testRegisterMultipleApplicationMasters() throws Exception {
for (int testAppId = 0; testAppId < 3; testAppId++) {
RegisterApplicationMasterResponse response =
registerApplicationMaster(testAppId);
@ -157,7 +157,7 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
* @throws Exception
*/
@Test
public void testRegisterMulitpleApplicationMastersInParallel()
public void testRegisterMultipleApplicationMastersInParallel()
throws Exception {
int numberOfRequests = 5;
ArrayList<String> testContexts =
@ -185,12 +185,12 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
Assert.assertEquals(Integer.toString(testAppId),
registerResponse.getQueue());
FinishApplicationMasterResponse finshResponse =
FinishApplicationMasterResponse finishResponse =
finishApplicationMaster(testAppId,
FinalApplicationStatus.SUCCEEDED);
Assert.assertNotNull(finshResponse);
Assert.assertEquals(true, finshResponse.getIsUnregistered());
Assert.assertNotNull(finishResponse);
Assert.assertEquals(true, finishResponse.getIsUnregistered());
}
@Test
@ -202,10 +202,10 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
Assert.assertEquals(Integer.toString(testAppId),
registerResponse.getQueue());
FinishApplicationMasterResponse finshResponse =
FinishApplicationMasterResponse finishResponse =
finishApplicationMaster(testAppId, FinalApplicationStatus.FAILED);
Assert.assertNotNull(finshResponse);
Assert.assertNotNull(finishResponse);
try {
// Try to finish an application master that is already finished.
@ -232,7 +232,7 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
}
@Test
public void testFinishMulitpleApplicationMasters() throws Exception {
public void testFinishMultipleApplicationMasters() throws Exception {
int numberOfRequests = 3;
for (int index = 0; index < numberOfRequests; index++) {
RegisterApplicationMasterResponse registerResponse =
@ -244,11 +244,11 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
// Finish in reverse sequence
for (int index = numberOfRequests - 1; index >= 0; index--) {
FinishApplicationMasterResponse finshResponse =
FinishApplicationMasterResponse finishResponse =
finishApplicationMaster(index, FinalApplicationStatus.SUCCEEDED);
Assert.assertNotNull(finshResponse);
Assert.assertEquals(true, finshResponse.getIsUnregistered());
Assert.assertNotNull(finishResponse);
Assert.assertEquals(true, finishResponse.getIsUnregistered());
// Assert that the application has been removed from the collection
Assert.assertTrue(this.getAMRMProxyService()
@ -277,7 +277,7 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
}
@Test
public void testFinishMulitpleApplicationMastersInParallel()
public void testFinishMultipleApplicationMastersInParallel()
throws Exception {
int numberOfRequests = 5;
ArrayList<String> testContexts = new ArrayList<String>();
@ -308,12 +308,12 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
AllocateResponse allocateResponse = allocate(testAppId);
Assert.assertNotNull(allocateResponse);
FinishApplicationMasterResponse finshResponse =
FinishApplicationMasterResponse finishResponse =
finishApplicationMaster(testAppId,
FinalApplicationStatus.SUCCEEDED);
Assert.assertNotNull(finshResponse);
Assert.assertEquals(true, finshResponse.getIsUnregistered());
Assert.assertNotNull(finishResponse);
Assert.assertEquals(true, finishResponse.getIsUnregistered());
}
@Test
@ -399,7 +399,7 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
getContainersAndAssert(testAppId, 10);
releaseContainersAndAssert(testAppId, containers);
LOG.info("Sucessfully registered application master with appId: "
LOG.info("Successfully registered application master with appId: "
+ testAppId);
} catch (Throwable ex) {
LOG.error(
@ -533,7 +533,7 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
// We need to make sure all the resource managers received the
// release list. The containers sent by the mock resource managers will be
// aggregated and returned back to us and we can assert if all the release
// aggregated and returned back to us, and we can assert if all the release
// lists reached the sub-clusters
List<ContainerId> containersForReleasedContainerIds = new ArrayList<>();
List<ContainerId> newlyFinished = getCompletedContainerIds(
@ -607,19 +607,19 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
allocateResponse = allocate(testAppId1);
Assert.assertNotNull(allocateResponse);
FinishApplicationMasterResponse finshResponse =
FinishApplicationMasterResponse finishResponse =
finishApplicationMaster(testAppId1, FinalApplicationStatus.SUCCEEDED);
Assert.assertNotNull(finshResponse);
Assert.assertEquals(true, finshResponse.getIsUnregistered());
Assert.assertNotNull(finishResponse);
Assert.assertEquals(true, finishResponse.getIsUnregistered());
allocateResponse = allocate(testAppId2);
Assert.assertNotNull(allocateResponse);
finshResponse =
finishResponse =
finishApplicationMaster(testAppId2, FinalApplicationStatus.SUCCEEDED);
Assert.assertNotNull(finshResponse);
Assert.assertEquals(true, finshResponse.getIsUnregistered());
Assert.assertNotNull(finishResponse);
Assert.assertEquals(true, finishResponse.getIsUnregistered());
int testAppId3 = 3;
try {
@ -684,7 +684,7 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
}
/**
* A mock intercepter implementation that uses the same mockRM instance across
* A mock interceptor implementation that uses the same mockRM instance across
* restart.
*/
public static class MockRequestInterceptorAcrossRestart
@ -723,7 +723,7 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
}
/**
* A mock intercepter implementation that throws when recovering.
* A mock interceptor implementation that throws when recovering.
*/
public static class BadRequestInterceptorAcrossRestart
extends MockRequestInterceptorAcrossRestart {

View File

@ -88,7 +88,7 @@ import org.slf4j.LoggerFactory;
* Extends the TestAMRMProxyService and overrides methods in order to use the
* AMRMProxyService's pipeline test cases for testing the FederationInterceptor
* class. The tests for AMRMProxyService has been written cleverly so that it
* can be reused to validate different request intercepter chains.
* can be reused to validate different request interceptor chains.
*/
public class TestFederationInterceptor extends BaseAMRMProxyTest {
private static final Logger LOG =
@ -152,8 +152,8 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
String mockPassThroughInterceptorClass =
PassThroughRequestInterceptor.class.getName();
// Create a request intercepter pipeline for testing. The last one in the
// chain is the federation intercepter that calls the mock resource manager.
// Create a request interceptor pipeline for testing. The last one in the
// chain is the federation interceptor that calls the mock resource manager.
// The others in the chain will simply forward it to the next one in the
// chain
conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
@ -259,7 +259,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
// The release request will be split and handled by the corresponding UAM.
// The release containers returned by the mock resource managers will be
// aggregated and returned back to us and we can check if total request size
// aggregated and returned back to us, and we can check if total request size
// and returned size are the same
List<ContainerId> containersForReleasedContainerIds =
new ArrayList<ContainerId>();
@ -365,10 +365,10 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
finishReq.setTrackingUrl("");
finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
FinishApplicationMasterResponse finshResponse =
FinishApplicationMasterResponse finishResponse =
interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finshResponse);
Assert.assertEquals(true, finshResponse.getIsUnregistered());
Assert.assertNotNull(finishResponse);
Assert.assertEquals(true, finishResponse.getIsUnregistered());
return null;
}
@ -425,10 +425,10 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
finishReq.setTrackingUrl("");
finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
FinishApplicationMasterResponse finshResponse =
FinishApplicationMasterResponse finishResponse =
interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finshResponse);
Assert.assertEquals(true, finshResponse.getIsUnregistered());
Assert.assertNotNull(finishResponse);
Assert.assertEquals(true, finishResponse.getIsUnregistered());
return null;
}
});
@ -573,7 +573,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
ConcurrentHashMap<String, MockResourceManagerFacade> secondaries =
interceptor.getSecondaryRMs();
// Create a new intercepter instance and recover
// Create a new interceptor instance and recover
interceptor = new TestableFederationInterceptor(homeRM, secondaries);
interceptor.init(new AMRMProxyApplicationContextImpl(nmContext,
getConf(), attemptId, "test-user", null, null, null, registryObj));
@ -608,10 +608,10 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
finishReq.setTrackingUrl("");
finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
FinishApplicationMasterResponse finshResponse =
FinishApplicationMasterResponse finishResponse =
interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finshResponse);
Assert.assertEquals(true, finshResponse.getIsUnregistered());
Assert.assertNotNull(finishResponse);
Assert.assertEquals(true, finishResponse.getIsUnregistered());
// After the application succeeds, the registry/NMSS entry should be
// cleaned up
@ -849,7 +849,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
ConcurrentHashMap<String, MockResourceManagerFacade> secondaries =
interceptor.getSecondaryRMs();
// Increase the attemptId and create a new intercepter instance for it
// Increase the attemptId and create a new interceptor instance for it
attemptId = ApplicationAttemptId.newInstance(
attemptId.getApplicationId(), attemptId.getAttemptId() + 1);
@ -888,10 +888,10 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
finishReq.setTrackingUrl("");
finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
FinishApplicationMasterResponse finshResponse =
FinishApplicationMasterResponse finishResponse =
interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finshResponse);
Assert.assertEquals(true, finshResponse.getIsUnregistered());
Assert.assertNotNull(finishResponse);
Assert.assertEquals(true, finishResponse.getIsUnregistered());
// After the application succeeds, the registry entry should be deleted
if (interceptor.getRegistryClient() != null) {

View File

@ -102,7 +102,7 @@ public class TestFederationInterceptorSecure extends BaseAMRMProxyTest {
private volatile int lastResponseId;
private MockResourceManagerFacade mockHomeRm = null;
private Server homeClietRMRpcServer;
private Server homeClientRMRpcServer;
private static File workDir;
private static final File TEST_ROOT_DIR =
@ -171,15 +171,15 @@ public class TestFederationInterceptorSecure extends BaseAMRMProxyTest {
private void startRpcServer() {
YarnRPC rpc = YarnRPC.create(conf);
this.homeClietRMRpcServer = rpc.getServer(ApplicationClientProtocol.class, mockHomeRm,
this.homeClientRMRpcServer = rpc.getServer(ApplicationClientProtocol.class, mockHomeRm,
NetUtils.createSocketAddr(HOME_RM_ADDRESS), conf, null, 2);
this.homeClietRMRpcServer.start();
this.homeClietRMRpcServer.refreshServiceAcl(conf, new MockRMPolicyProvider());
this.homeClientRMRpcServer.start();
this.homeClientRMRpcServer.refreshServiceAcl(conf, new MockRMPolicyProvider());
}
private void stopRpcServer() {
if (homeClietRMRpcServer != null) {
homeClietRMRpcServer.stop();
if (homeClientRMRpcServer != null) {
homeClientRMRpcServer.stop();
}
}
@ -200,8 +200,8 @@ public class TestFederationInterceptorSecure extends BaseAMRMProxyTest {
String mockPassThroughInterceptorClass =
PassThroughRequestInterceptor.class.getName();
// Create a request intercepter pipeline for testing. The last one in the
// chain is the federation intercepter that calls the mock resource manager.
// Create a request interceptor pipeline for testing. The last one in the
// chain is the federation interceptor that calls the mock resource manager.
// The others in the chain will simply forward it to the next one in the
// chain
conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
@ -410,7 +410,7 @@ public class TestFederationInterceptorSecure extends BaseAMRMProxyTest {
ConcurrentHashMap<String, MockResourceManagerFacade> secondaries =
interceptor.getSecondaryRMs();
// Create a new intercepter instance and recover
// Create a new interceptor instance and recover
interceptor = new TestableFederationInterceptor(homeRM, secondaries);
interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, conf, attemptId,
"test-user", null, null, null, registryObj));
@ -445,10 +445,10 @@ public class TestFederationInterceptorSecure extends BaseAMRMProxyTest {
finishReq.setTrackingUrl("");
finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
FinishApplicationMasterResponse finshResponse =
FinishApplicationMasterResponse finishResponse =
interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finshResponse);
Assert.assertEquals(true, finshResponse.getIsUnregistered());
Assert.assertNotNull(finishResponse);
Assert.assertEquals(true, finishResponse.getIsUnregistered());
// After the application succeeds, the registry/NMSS entry should be
// cleaned up

View File

@ -258,7 +258,7 @@ public class TestableFederationInterceptor extends FederationInterceptor {
}
/**
* Wrap the handler thread so it calls from the same user.
* Wrap the handler thread, so it calls from the same user.
*/
protected class TestableAMRequestHandlerThread
extends AMHeartbeatRequestHandler {