MAPREDUCE-4703. Add the ability to start the MiniMRClientCluster using the configurations used before it is being stopped. (ahmed.radwan via tucu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1419618 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8f5bd5f1bc
commit
054270cc28
|
@ -171,6 +171,9 @@ Release 2.0.3-alpha - Unreleased
|
|||
|
||||
MAPREDUCE-4723. Fix warnings found by findbugs 2. (Sandy Ryza via eli)
|
||||
|
||||
MAPREDUCE-4703. Add the ability to start the MiniMRClientCluster using
|
||||
the configurations used before it is being stopped. (ahmed.radwan via tucu)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -31,6 +31,11 @@ public interface MiniMRClientCluster {
|
|||
|
||||
public void start() throws IOException;
|
||||
|
||||
/**
|
||||
* Stop and start back the cluster using the same configuration.
|
||||
*/
|
||||
public void restart() throws IOException;
|
||||
|
||||
public void stop() throws IOException;
|
||||
|
||||
public Configuration getConfig() throws IOException;
|
||||
|
|
|
@ -67,6 +67,10 @@ public class MiniMRClientClusterFactory {
|
|||
|
||||
MiniMRYarnCluster miniMRYarnCluster = new MiniMRYarnCluster(caller
|
||||
.getName(), noOfNMs);
|
||||
job.getConfiguration().set("minimrclientcluster.caller.name",
|
||||
caller.getName());
|
||||
job.getConfiguration().setInt("minimrclientcluster.nodemanagers.number",
|
||||
noOfNMs);
|
||||
miniMRYarnCluster.init(job.getConfiguration());
|
||||
miniMRYarnCluster.start();
|
||||
|
||||
|
|
|
@ -18,8 +18,13 @@
|
|||
|
||||
package org.apache.hadoop.mapred;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.service.Service.STATE;
|
||||
|
||||
/**
|
||||
* An adapter for MiniMRYarnCluster providing a MiniMRClientCluster interface.
|
||||
|
@ -29,6 +34,8 @@ public class MiniMRYarnClusterAdapter implements MiniMRClientCluster {
|
|||
|
||||
private MiniMRYarnCluster miniMRYarnCluster;
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(MiniMRYarnClusterAdapter.class);
|
||||
|
||||
public MiniMRYarnClusterAdapter(MiniMRYarnCluster miniMRYarnCluster) {
|
||||
this.miniMRYarnCluster = miniMRYarnCluster;
|
||||
}
|
||||
|
@ -48,4 +55,22 @@ public class MiniMRYarnClusterAdapter implements MiniMRClientCluster {
|
|||
miniMRYarnCluster.stop();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void restart() {
|
||||
if (!miniMRYarnCluster.getServiceState().equals(STATE.STARTED)){
|
||||
LOG.warn("Cannot restart the mini cluster, start it first");
|
||||
return;
|
||||
}
|
||||
Configuration oldConf = new Configuration(getConfig());
|
||||
String callerName = oldConf.get("minimrclientcluster.caller.name",
|
||||
this.getClass().getName());
|
||||
int noOfNMs = oldConf.getInt("minimrclientcluster.nodemanagers.number", 1);
|
||||
oldConf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
|
||||
oldConf.setBoolean(JHAdminConfig.MR_HISTORY_MINICLUSTER_FIXED_PORTS, true);
|
||||
stop();
|
||||
miniMRYarnCluster = new MiniMRYarnCluster(callerName, noOfNMs);
|
||||
miniMRYarnCluster.init(oldConf);
|
||||
miniMRYarnCluster.start();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -32,6 +32,8 @@ import org.apache.hadoop.io.IntWritable;
|
|||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapreduce.Counters;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
@ -91,6 +93,65 @@ public class TestMiniMRClientCluster {
|
|||
mrCluster.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRestart() throws Exception {
|
||||
|
||||
String rmAddress1 = mrCluster.getConfig().get(YarnConfiguration.RM_ADDRESS);
|
||||
String rmAdminAddress1 = mrCluster.getConfig().get(
|
||||
YarnConfiguration.RM_ADMIN_ADDRESS);
|
||||
String rmSchedAddress1 = mrCluster.getConfig().get(
|
||||
YarnConfiguration.RM_SCHEDULER_ADDRESS);
|
||||
String rmRstrackerAddress1 = mrCluster.getConfig().get(
|
||||
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS);
|
||||
String rmWebAppAddress1 = mrCluster.getConfig().get(
|
||||
YarnConfiguration.RM_WEBAPP_ADDRESS);
|
||||
|
||||
String mrHistAddress1 = mrCluster.getConfig().get(
|
||||
JHAdminConfig.MR_HISTORY_ADDRESS);
|
||||
String mrHistWebAppAddress1 = mrCluster.getConfig().get(
|
||||
JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS);
|
||||
|
||||
mrCluster.restart();
|
||||
|
||||
String rmAddress2 = mrCluster.getConfig().get(YarnConfiguration.RM_ADDRESS);
|
||||
String rmAdminAddress2 = mrCluster.getConfig().get(
|
||||
YarnConfiguration.RM_ADMIN_ADDRESS);
|
||||
String rmSchedAddress2 = mrCluster.getConfig().get(
|
||||
YarnConfiguration.RM_SCHEDULER_ADDRESS);
|
||||
String rmRstrackerAddress2 = mrCluster.getConfig().get(
|
||||
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS);
|
||||
String rmWebAppAddress2 = mrCluster.getConfig().get(
|
||||
YarnConfiguration.RM_WEBAPP_ADDRESS);
|
||||
|
||||
String mrHistAddress2 = mrCluster.getConfig().get(
|
||||
JHAdminConfig.MR_HISTORY_ADDRESS);
|
||||
String mrHistWebAppAddress2 = mrCluster.getConfig().get(
|
||||
JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS);
|
||||
|
||||
assertEquals("Address before restart: " + rmAddress1
|
||||
+ " is different from new address: " + rmAddress2, rmAddress1,
|
||||
rmAddress2);
|
||||
assertEquals("Address before restart: " + rmAdminAddress1
|
||||
+ " is different from new address: " + rmAdminAddress2,
|
||||
rmAdminAddress1, rmAdminAddress2);
|
||||
assertEquals("Address before restart: " + rmSchedAddress1
|
||||
+ " is different from new address: " + rmSchedAddress2,
|
||||
rmSchedAddress1, rmSchedAddress2);
|
||||
assertEquals("Address before restart: " + rmRstrackerAddress1
|
||||
+ " is different from new address: " + rmRstrackerAddress2,
|
||||
rmRstrackerAddress1, rmRstrackerAddress2);
|
||||
assertEquals("Address before restart: " + rmWebAppAddress1
|
||||
+ " is different from new address: " + rmWebAppAddress2,
|
||||
rmWebAppAddress1, rmWebAppAddress2);
|
||||
assertEquals("Address before restart: " + mrHistAddress1
|
||||
+ " is different from new address: " + mrHistAddress2, mrHistAddress1,
|
||||
mrHistAddress2);
|
||||
assertEquals("Address before restart: " + mrHistWebAppAddress1
|
||||
+ " is different from new address: " + mrHistWebAppAddress2,
|
||||
mrHistWebAppAddress1, mrHistWebAppAddress2);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJob() throws Exception {
|
||||
final Job job = createJob();
|
||||
|
|
Loading…
Reference in New Issue