YARN-4117. End to end unit test with mini YARN cluster for AMRMProxy Service. Contributed by Giovanni Matteo Fumarola
This commit is contained in:
parent
4212f2e2bf
commit
bdc648ebe7
|
@ -28,6 +28,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
|
@ -512,6 +513,16 @@ public class AMRMProxyService extends AbstractService implements
|
|||
return null;
|
||||
}
|
||||
|
||||
@Private
|
||||
public InetSocketAddress getBindAddress() {
|
||||
return this.listenerEndpoint;
|
||||
}
|
||||
|
||||
@Private
|
||||
public AMRMProxyTokenSecretManager getSecretManager() {
|
||||
return this.secretManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* Private class for handling application stop events.
|
||||
*
|
||||
|
@ -546,7 +557,8 @@ public class AMRMProxyService extends AbstractService implements
|
|||
* ApplicationAttemptId instances.
|
||||
*
|
||||
*/
|
||||
private static class RequestInterceptorChainWrapper {
|
||||
@Private
|
||||
public static class RequestInterceptorChainWrapper {
|
||||
private RequestInterceptor rootInterceptor;
|
||||
private ApplicationAttemptId applicationAttemptId;
|
||||
|
||||
|
|
|
@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Extends the AbstractRequestInterceptor class and provides an implementation
|
||||
* that simply forwards the AM requests to the cluster resource manager.
|
||||
|
@ -135,4 +137,9 @@ public final class DefaultRequestInterceptor extends
|
|||
user.addToken(amrmToken);
|
||||
amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConf()));
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setRMClient(ApplicationMasterProtocol rmClient) {
|
||||
this.rmClient = rmClient;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -183,7 +183,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
private final ReadLock readLock;
|
||||
private final WriteLock writeLock;
|
||||
private AMRMProxyService amrmProxyService;
|
||||
private boolean amrmProxyEnabled = false;
|
||||
protected boolean amrmProxyEnabled = false;
|
||||
|
||||
private long waitForContainersOnShutdownMillis;
|
||||
|
||||
|
@ -247,19 +247,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
addService(sharedCacheUploader);
|
||||
dispatcher.register(SharedCacheUploadEventType.class, sharedCacheUploader);
|
||||
|
||||
amrmProxyEnabled =
|
||||
conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED,
|
||||
YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED);
|
||||
|
||||
if (amrmProxyEnabled) {
|
||||
LOG.info("AMRMProxyService is enabled. "
|
||||
+ "All the AM->RM requests will be intercepted by the proxy");
|
||||
this.amrmProxyService =
|
||||
new AMRMProxyService(this.context, this.dispatcher);
|
||||
addService(this.amrmProxyService);
|
||||
} else {
|
||||
LOG.info("AMRMProxyService is disabled");
|
||||
}
|
||||
createAMRMProxyService(conf);
|
||||
|
||||
waitForContainersOnShutdownMillis =
|
||||
conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
|
||||
|
@ -272,8 +260,20 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
recover();
|
||||
}
|
||||
|
||||
public boolean isARMRMProxyEnabled() {
|
||||
return amrmProxyEnabled;
|
||||
protected void createAMRMProxyService(Configuration conf) {
|
||||
this.amrmProxyEnabled =
|
||||
conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED,
|
||||
YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED);
|
||||
|
||||
if (amrmProxyEnabled) {
|
||||
LOG.info("AMRMProxyService is enabled. "
|
||||
+ "All the AM->RM requests will be intercepted by the proxy");
|
||||
this.setAMRMProxyService(
|
||||
new AMRMProxyService(this.context, this.dispatcher));
|
||||
addService(this.getAMRMProxyService());
|
||||
} else {
|
||||
LOG.info("AMRMProxyService is disabled");
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -810,9 +810,9 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
|
||||
// Initialize the AMRMProxy service instance only if the container is of
|
||||
// type AM and if the AMRMProxy service is enabled
|
||||
if (isARMRMProxyEnabled() && containerTokenIdentifier
|
||||
.getContainerType().equals(ContainerType.APPLICATION_MASTER)) {
|
||||
this.amrmProxyService.processApplicationStartRequest(request);
|
||||
if (amrmProxyEnabled && containerTokenIdentifier.getContainerType()
|
||||
.equals(ContainerType.APPLICATION_MASTER)) {
|
||||
this.getAMRMProxyService().processApplicationStartRequest(request);
|
||||
}
|
||||
|
||||
startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
|
||||
|
@ -1413,4 +1413,15 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
public Map<String, ByteBuffer> getAuxServiceMetaData() {
|
||||
return this.auxiliaryServices.getMetaData();
|
||||
}
|
||||
|
||||
@Private
|
||||
public AMRMProxyService getAMRMProxyService() {
|
||||
return this.amrmProxyService;
|
||||
}
|
||||
|
||||
@Private
|
||||
protected void setAMRMProxyService(AMRMProxyService amrmProxyService) {
|
||||
this.amrmProxyService = amrmProxyService;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -35,21 +35,23 @@ import org.apache.hadoop.fs.FileContext;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.service.CompositeService;
|
||||
import org.apache.hadoop.util.Shell;
|
||||
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceTracker;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
|
@ -61,24 +63,31 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
|||
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer;
|
||||
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryStore;
|
||||
import org.apache.hadoop.yarn.server.applicationhistoryservice.MemoryApplicationHistoryStore;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore;
|
||||
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
|
||||
import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore;
|
||||
import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore;
|
||||
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
@ -698,6 +707,15 @@ public class MiniYARNCluster extends CompositeService {
|
|||
protected void stopRMProxy() { }
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ContainerManagerImpl createContainerManager(Context context,
|
||||
ContainerExecutor exec, DeletionService del,
|
||||
NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager,
|
||||
LocalDirsHandlerService dirsHandler) {
|
||||
return new CustomContainerManagerImpl(context, exec, del,
|
||||
nodeStatusUpdater, metrics, dirsHandler);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -799,4 +817,55 @@ public class MiniYARNCluster extends CompositeService {
|
|||
public int getNumOfResourceManager() {
|
||||
return this.resourceManagers.length;
|
||||
}
|
||||
|
||||
private class CustomContainerManagerImpl extends ContainerManagerImpl {
|
||||
|
||||
public CustomContainerManagerImpl(Context context, ContainerExecutor exec,
|
||||
DeletionService del, NodeStatusUpdater nodeStatusUpdater,
|
||||
NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) {
|
||||
super(context, exec, del, nodeStatusUpdater, metrics, dirsHandler);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void createAMRMProxyService(Configuration conf) {
|
||||
this.amrmProxyEnabled =
|
||||
conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED,
|
||||
YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED);
|
||||
|
||||
if (this.amrmProxyEnabled) {
|
||||
LOG.info("CustomAMRMProxyService is enabled. "
|
||||
+ "All the AM->RM requests will be intercepted by the proxy");
|
||||
AMRMProxyService amrmProxyService =
|
||||
useRpc ? new AMRMProxyService(getContext(), dispatcher)
|
||||
: new ShortCircuitedAMRMProxy(getContext(), dispatcher);
|
||||
this.setAMRMProxyService(amrmProxyService);
|
||||
addService(this.getAMRMProxyService());
|
||||
} else {
|
||||
LOG.info("CustomAMRMProxyService is disabled");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class ShortCircuitedAMRMProxy extends AMRMProxyService {
|
||||
|
||||
public ShortCircuitedAMRMProxy(Context context,
|
||||
AsyncDispatcher dispatcher) {
|
||||
super(context, dispatcher);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initializePipeline(ApplicationAttemptId applicationAttemptId,
|
||||
String user, Token<AMRMTokenIdentifier> amrmToken,
|
||||
Token<AMRMTokenIdentifier> localToken) {
|
||||
super.initializePipeline(applicationAttemptId, user, amrmToken,
|
||||
localToken);
|
||||
RequestInterceptor rt = getPipelines()
|
||||
.get(applicationAttemptId.getApplicationId()).getRootInterceptor();
|
||||
if (rt instanceof DefaultRequestInterceptor) {
|
||||
((DefaultRequestInterceptor) rt)
|
||||
.setRMClient(getResourceManager().getApplicationMasterService());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue