MAPREDUCE-4703. Add the ability to start the MiniMRClientCluster using the configurations used before it is being stopped. (ahmed.radwan via tucu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1419619 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ddd84ebedb
commit
89b9fc3034
|
@ -20,6 +20,9 @@ Release 2.0.3-alpha - Unreleased
|
||||||
HADOOP-8911. CRLF characters in source and text files.
|
HADOOP-8911. CRLF characters in source and text files.
|
||||||
(Raja Aluri via suresh)
|
(Raja Aluri via suresh)
|
||||||
|
|
||||||
|
MAPREDUCE-4703. Add the ability to start the MiniMRClientCluster using
|
||||||
|
the configurations used before it is being stopped. (ahmed.radwan via tucu)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -31,6 +31,11 @@ public interface MiniMRClientCluster {
|
||||||
|
|
||||||
public void start() throws IOException;
|
public void start() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stop and start back the cluster using the same configuration.
|
||||||
|
*/
|
||||||
|
public void restart() throws IOException;
|
||||||
|
|
||||||
public void stop() throws IOException;
|
public void stop() throws IOException;
|
||||||
|
|
||||||
public Configuration getConfig() throws IOException;
|
public Configuration getConfig() throws IOException;
|
||||||
|
|
|
@ -67,6 +67,10 @@ public class MiniMRClientClusterFactory {
|
||||||
|
|
||||||
MiniMRYarnCluster miniMRYarnCluster = new MiniMRYarnCluster(caller
|
MiniMRYarnCluster miniMRYarnCluster = new MiniMRYarnCluster(caller
|
||||||
.getName(), noOfNMs);
|
.getName(), noOfNMs);
|
||||||
|
job.getConfiguration().set("minimrclientcluster.caller.name",
|
||||||
|
caller.getName());
|
||||||
|
job.getConfiguration().setInt("minimrclientcluster.nodemanagers.number",
|
||||||
|
noOfNMs);
|
||||||
miniMRYarnCluster.init(job.getConfiguration());
|
miniMRYarnCluster.init(job.getConfiguration());
|
||||||
miniMRYarnCluster.start();
|
miniMRYarnCluster.start();
|
||||||
|
|
||||||
|
|
|
@ -18,8 +18,13 @@
|
||||||
|
|
||||||
package org.apache.hadoop.mapred;
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
|
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.service.Service.STATE;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An adapter for MiniMRYarnCluster providing a MiniMRClientCluster interface.
|
* An adapter for MiniMRYarnCluster providing a MiniMRClientCluster interface.
|
||||||
|
@ -29,6 +34,8 @@ public class MiniMRYarnClusterAdapter implements MiniMRClientCluster {
|
||||||
|
|
||||||
private MiniMRYarnCluster miniMRYarnCluster;
|
private MiniMRYarnCluster miniMRYarnCluster;
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(MiniMRYarnClusterAdapter.class);
|
||||||
|
|
||||||
public MiniMRYarnClusterAdapter(MiniMRYarnCluster miniMRYarnCluster) {
|
public MiniMRYarnClusterAdapter(MiniMRYarnCluster miniMRYarnCluster) {
|
||||||
this.miniMRYarnCluster = miniMRYarnCluster;
|
this.miniMRYarnCluster = miniMRYarnCluster;
|
||||||
}
|
}
|
||||||
|
@ -48,4 +55,22 @@ public class MiniMRYarnClusterAdapter implements MiniMRClientCluster {
|
||||||
miniMRYarnCluster.stop();
|
miniMRYarnCluster.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void restart() {
|
||||||
|
if (!miniMRYarnCluster.getServiceState().equals(STATE.STARTED)){
|
||||||
|
LOG.warn("Cannot restart the mini cluster, start it first");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Configuration oldConf = new Configuration(getConfig());
|
||||||
|
String callerName = oldConf.get("minimrclientcluster.caller.name",
|
||||||
|
this.getClass().getName());
|
||||||
|
int noOfNMs = oldConf.getInt("minimrclientcluster.nodemanagers.number", 1);
|
||||||
|
oldConf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
|
||||||
|
oldConf.setBoolean(JHAdminConfig.MR_HISTORY_MINICLUSTER_FIXED_PORTS, true);
|
||||||
|
stop();
|
||||||
|
miniMRYarnCluster = new MiniMRYarnCluster(callerName, noOfNMs);
|
||||||
|
miniMRYarnCluster.init(oldConf);
|
||||||
|
miniMRYarnCluster.start();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,6 +32,8 @@ import org.apache.hadoop.io.IntWritable;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapreduce.Counters;
|
import org.apache.hadoop.mapreduce.Counters;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -91,6 +93,65 @@ public class TestMiniMRClientCluster {
|
||||||
mrCluster.stop();
|
mrCluster.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRestart() throws Exception {
|
||||||
|
|
||||||
|
String rmAddress1 = mrCluster.getConfig().get(YarnConfiguration.RM_ADDRESS);
|
||||||
|
String rmAdminAddress1 = mrCluster.getConfig().get(
|
||||||
|
YarnConfiguration.RM_ADMIN_ADDRESS);
|
||||||
|
String rmSchedAddress1 = mrCluster.getConfig().get(
|
||||||
|
YarnConfiguration.RM_SCHEDULER_ADDRESS);
|
||||||
|
String rmRstrackerAddress1 = mrCluster.getConfig().get(
|
||||||
|
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS);
|
||||||
|
String rmWebAppAddress1 = mrCluster.getConfig().get(
|
||||||
|
YarnConfiguration.RM_WEBAPP_ADDRESS);
|
||||||
|
|
||||||
|
String mrHistAddress1 = mrCluster.getConfig().get(
|
||||||
|
JHAdminConfig.MR_HISTORY_ADDRESS);
|
||||||
|
String mrHistWebAppAddress1 = mrCluster.getConfig().get(
|
||||||
|
JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS);
|
||||||
|
|
||||||
|
mrCluster.restart();
|
||||||
|
|
||||||
|
String rmAddress2 = mrCluster.getConfig().get(YarnConfiguration.RM_ADDRESS);
|
||||||
|
String rmAdminAddress2 = mrCluster.getConfig().get(
|
||||||
|
YarnConfiguration.RM_ADMIN_ADDRESS);
|
||||||
|
String rmSchedAddress2 = mrCluster.getConfig().get(
|
||||||
|
YarnConfiguration.RM_SCHEDULER_ADDRESS);
|
||||||
|
String rmRstrackerAddress2 = mrCluster.getConfig().get(
|
||||||
|
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS);
|
||||||
|
String rmWebAppAddress2 = mrCluster.getConfig().get(
|
||||||
|
YarnConfiguration.RM_WEBAPP_ADDRESS);
|
||||||
|
|
||||||
|
String mrHistAddress2 = mrCluster.getConfig().get(
|
||||||
|
JHAdminConfig.MR_HISTORY_ADDRESS);
|
||||||
|
String mrHistWebAppAddress2 = mrCluster.getConfig().get(
|
||||||
|
JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS);
|
||||||
|
|
||||||
|
assertEquals("Address before restart: " + rmAddress1
|
||||||
|
+ " is different from new address: " + rmAddress2, rmAddress1,
|
||||||
|
rmAddress2);
|
||||||
|
assertEquals("Address before restart: " + rmAdminAddress1
|
||||||
|
+ " is different from new address: " + rmAdminAddress2,
|
||||||
|
rmAdminAddress1, rmAdminAddress2);
|
||||||
|
assertEquals("Address before restart: " + rmSchedAddress1
|
||||||
|
+ " is different from new address: " + rmSchedAddress2,
|
||||||
|
rmSchedAddress1, rmSchedAddress2);
|
||||||
|
assertEquals("Address before restart: " + rmRstrackerAddress1
|
||||||
|
+ " is different from new address: " + rmRstrackerAddress2,
|
||||||
|
rmRstrackerAddress1, rmRstrackerAddress2);
|
||||||
|
assertEquals("Address before restart: " + rmWebAppAddress1
|
||||||
|
+ " is different from new address: " + rmWebAppAddress2,
|
||||||
|
rmWebAppAddress1, rmWebAppAddress2);
|
||||||
|
assertEquals("Address before restart: " + mrHistAddress1
|
||||||
|
+ " is different from new address: " + mrHistAddress2, mrHistAddress1,
|
||||||
|
mrHistAddress2);
|
||||||
|
assertEquals("Address before restart: " + mrHistWebAppAddress1
|
||||||
|
+ " is different from new address: " + mrHistWebAppAddress2,
|
||||||
|
mrHistWebAppAddress1, mrHistWebAppAddress2);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testJob() throws Exception {
|
public void testJob() throws Exception {
|
||||||
final Job job = createJob();
|
final Job job = createJob();
|
||||||
|
|
Loading…
Reference in New Issue