MAPREDUCE-3040. Fixed extra copy of Configuration in YarnClientProtocolProvider and ensured MiniMRYarnCluster sets JobHistory configuration for tests.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1172929 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9d14f100c8
commit
f0fedda8ef
|
@ -1366,6 +1366,10 @@ Release 0.23.0 - Unreleased
|
||||||
MAPREDUCE-3017. The Web UI shows FINISHED for killed/successful/failed jobs.
|
MAPREDUCE-3017. The Web UI shows FINISHED for killed/successful/failed jobs.
|
||||||
(mahadev)
|
(mahadev)
|
||||||
|
|
||||||
|
MAPREDUCE-3040. Fixed extra copy of Configuration in
|
||||||
|
YarnClientProtocolProvider and ensured MiniMRYarnCluster sets JobHistory
|
||||||
|
configuration for tests. (acmurthy)
|
||||||
|
|
||||||
Release 0.22.0 - Unreleased
|
Release 0.22.0 - Unreleased
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -77,7 +77,6 @@ public class ClientCache {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS);
|
final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS);
|
||||||
if (StringUtils.isEmpty(serviceAddr)) {
|
if (StringUtils.isEmpty(serviceAddr)) {
|
||||||
LOG.info("HistoryServer is not configured.");
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
LOG.info("Connecting to HistoryServer at: " + serviceAddr);
|
LOG.info("Connecting to HistoryServer at: " + serviceAddr);
|
||||||
|
|
|
@ -74,16 +74,16 @@ import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo;
|
||||||
public class ResourceMgrDelegate {
|
public class ResourceMgrDelegate {
|
||||||
private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class);
|
private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class);
|
||||||
|
|
||||||
private Configuration conf;
|
private YarnConfiguration conf;
|
||||||
ClientRMProtocol applicationsManager;
|
ClientRMProtocol applicationsManager;
|
||||||
private ApplicationId applicationId;
|
private ApplicationId applicationId;
|
||||||
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
||||||
|
|
||||||
public ResourceMgrDelegate(Configuration conf) {
|
public ResourceMgrDelegate(YarnConfiguration conf) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
YarnRPC rpc = YarnRPC.create(conf);
|
YarnRPC rpc = YarnRPC.create(this.conf);
|
||||||
InetSocketAddress rmAddress =
|
InetSocketAddress rmAddress =
|
||||||
NetUtils.createSocketAddr(conf.get(
|
NetUtils.createSocketAddr(this.conf.get(
|
||||||
YarnConfiguration.RM_ADDRESS,
|
YarnConfiguration.RM_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_RM_ADDRESS));
|
YarnConfiguration.DEFAULT_RM_ADDRESS));
|
||||||
LOG.info("Connecting to ResourceManager at " + rmAddress);
|
LOG.info("Connecting to ResourceManager at " + rmAddress);
|
||||||
|
|
|
@ -20,16 +20,13 @@ package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Vector;
|
import java.util.Vector;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
|
||||||
import org.apache.hadoop.fs.FileContext;
|
import org.apache.hadoop.fs.FileContext;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -43,7 +40,6 @@ import org.apache.hadoop.mapreduce.Counters;
|
||||||
import org.apache.hadoop.mapreduce.JobContext;
|
import org.apache.hadoop.mapreduce.JobContext;
|
||||||
import org.apache.hadoop.mapreduce.JobID;
|
import org.apache.hadoop.mapreduce.JobID;
|
||||||
import org.apache.hadoop.mapreduce.JobStatus;
|
import org.apache.hadoop.mapreduce.JobStatus;
|
||||||
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
|
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.QueueAclsInfo;
|
import org.apache.hadoop.mapreduce.QueueAclsInfo;
|
||||||
import org.apache.hadoop.mapreduce.QueueInfo;
|
import org.apache.hadoop.mapreduce.QueueInfo;
|
||||||
|
@ -62,7 +58,6 @@ import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
|
||||||
import org.apache.hadoop.yarn.YarnException;
|
import org.apache.hadoop.yarn.YarnException;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
@ -91,7 +86,7 @@ public class YARNRunner implements ClientProtocol {
|
||||||
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
||||||
private ResourceMgrDelegate resMgrDelegate;
|
private ResourceMgrDelegate resMgrDelegate;
|
||||||
private ClientCache clientCache;
|
private ClientCache clientCache;
|
||||||
private YarnConfiguration conf;
|
private Configuration conf;
|
||||||
private final FileContext defaultFileContext;
|
private final FileContext defaultFileContext;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -99,22 +94,21 @@ public class YARNRunner implements ClientProtocol {
|
||||||
* yarn
|
* yarn
|
||||||
* @param conf the configuration object for the client
|
* @param conf the configuration object for the client
|
||||||
*/
|
*/
|
||||||
public YARNRunner(YarnConfiguration conf) {
|
public YARNRunner(Configuration conf) {
|
||||||
this(conf, new ResourceMgrDelegate(conf));
|
this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf)));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Similar to {@link #YARNRunner(YarnConfiguration)} but allowing injecting
|
* Similar to {@link #YARNRunner(Configuration)} but allowing injecting
|
||||||
* {@link ResourceMgrDelegate}. Enables mocking and testing.
|
* {@link ResourceMgrDelegate}. Enables mocking and testing.
|
||||||
* @param conf the configuration object for the client
|
* @param conf the configuration object for the client
|
||||||
* @param resMgrDelegate the resourcemanager client handle.
|
* @param resMgrDelegate the resourcemanager client handle.
|
||||||
*/
|
*/
|
||||||
public YARNRunner(YarnConfiguration conf, ResourceMgrDelegate resMgrDelegate) {
|
public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
try {
|
try {
|
||||||
this.resMgrDelegate = resMgrDelegate;
|
this.resMgrDelegate = resMgrDelegate;
|
||||||
this.clientCache = new ClientCache(this.conf,
|
this.clientCache = new ClientCache(this.conf, resMgrDelegate);
|
||||||
resMgrDelegate);
|
|
||||||
this.defaultFileContext = FileContext.getFileContext(this.conf);
|
this.defaultFileContext = FileContext.getFileContext(this.conf);
|
||||||
} catch (UnsupportedFileSystemException ufe) {
|
} catch (UnsupportedFileSystemException ufe) {
|
||||||
throw new RuntimeException("Error in instantiating YarnClient", ufe);
|
throw new RuntimeException("Error in instantiating YarnClient", ufe);
|
||||||
|
|
|
@ -25,14 +25,13 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.mapreduce.MRConfig;
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
|
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
|
import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
||||||
|
|
||||||
public class YarnClientProtocolProvider extends ClientProtocolProvider {
|
public class YarnClientProtocolProvider extends ClientProtocolProvider {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ClientProtocol create(Configuration conf) throws IOException {
|
public ClientProtocol create(Configuration conf) throws IOException {
|
||||||
if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
|
if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
|
||||||
return new YARNRunner(new YarnConfiguration(conf));
|
return new YARNRunner(conf);
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.mapred.ShuffleHandler;
|
||||||
import org.apache.hadoop.mapreduce.MRConfig;
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer;
|
import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||||
import org.apache.hadoop.yarn.YarnException;
|
import org.apache.hadoop.yarn.YarnException;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||||
|
@ -82,6 +83,10 @@ public class MiniMRYarnCluster extends MiniYARNCluster {
|
||||||
// for corresponding uberized tests.
|
// for corresponding uberized tests.
|
||||||
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
||||||
|
|
||||||
|
// Set config for JH Server
|
||||||
|
conf.set(JHAdminConfig.MR_HISTORY_ADDRESS,
|
||||||
|
JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS);
|
||||||
|
|
||||||
super.init(conf);
|
super.init(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue