From ca333f17c125735c1de412bb27db599ee752b24c Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Fri, 16 Sep 2011 22:22:34 +0000 Subject: [PATCH] MAPREDUCE-2836. Provide option to fail jobs when submitted to non-existent fair scheduler pools. Contributed by Ahmed Radwan. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1171832 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../apache/hadoop/mapred/FairScheduler.java | 31 ++- .../org/apache/hadoop/mapred/PoolManager.java | 11 ++ .../mapred/UndeclaredPoolException.java | 32 +++ .../mapred/TestFairSchedulerPoolNames.java | 182 ++++++++++++++++++ .../content/xdocs/fair_scheduler.xml | 9 + .../org/apache/hadoop/mapred/JobTracker.java | 7 + .../apache/hadoop/mapred/TaskScheduler.java | 10 + 8 files changed, 284 insertions(+), 1 deletion(-) create mode 100644 hadoop-mapreduce-project/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/UndeclaredPoolException.java create mode 100644 hadoop-mapreduce-project/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairSchedulerPoolNames.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 57d2d510feb..68341f0458b 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -17,6 +17,9 @@ Trunk (unreleased changes) MAPREDUCE-2934. MR portion of HADOOP-7607 - Simplify the RPC proxy cleanup process (atm) + MAPREDUCE-2836. Provide option to fail jobs when submitted to non-existent + fair scheduler pools. (Ahmed Radwan via todd) + BUG FIXES MAPREDUCE-2784. [Gridmix] Bug fixes in ExecutionSummarizer and diff --git a/hadoop-mapreduce-project/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java b/hadoop-mapreduce-project/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java index d0ac659e579..89d6d385ce8 100644 --- a/hadoop-mapreduce-project/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java +++ b/hadoop-mapreduce-project/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java @@ -42,6 +42,7 @@ import org.apache.hadoop.metrics.MetricsContext; import org.apache.hadoop.metrics.MetricsUtil; import org.apache.hadoop.metrics.Updater; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; /** * A {@link TaskScheduler} that implements fair sharing. @@ -97,7 +98,15 @@ public class FairScheduler extends TaskScheduler { protected long lastDumpTime; // Time when we last dumped state to log protected long lastHeartbeatTime; // Time we last ran assignTasks private long lastPreemptCheckTime; // Time we last ran preemptTasksIfNecessary - + + /** + * A configuration property that controls the ability of submitting jobs to + * pools not declared in the scheduler allocation file. + */ + public final static String ALLOW_UNDECLARED_POOLS_KEY = + "mapred.fairscheduler.allow.undeclared.pools"; + private boolean allowUndeclaredPools = false; + /** * A class for holding per-job scheduler variables. These always contain the * values of the variables at the last update(), and are used along with a @@ -195,6 +204,8 @@ public class FairScheduler extends TaskScheduler { "mapred.fairscheduler.locality.delay.node", defaultDelay); rackLocalityDelay = conf.getLong( "mapred.fairscheduler.locality.delay.rack", defaultDelay); + allowUndeclaredPools = conf.getBoolean(ALLOW_UNDECLARED_POOLS_KEY, true); + if (defaultDelay == -1 && (nodeLocalityDelay == -1 || rackLocalityDelay == -1)) { autoComputeLocalityDelay = true; // Compute from heartbeat interval @@ -1098,4 +1109,22 @@ public class FairScheduler extends TaskScheduler { long getLastPreemptionUpdateTime() { return lastPreemptionUpdateTime; } + + /** + * Examines the job's pool name to determine if it is a declared pool name (in + * the scheduler allocation file). + */ + @Override + public void checkJobSubmission(JobInProgress job) + throws UndeclaredPoolException { + Set declaredPools = poolMgr.getDeclaredPools(); + if (!this.allowUndeclaredPools + && !declaredPools.contains(poolMgr.getPoolName(job))) + throw new UndeclaredPoolException("Pool name: '" + + poolMgr.getPoolName(job) + + "' is invalid. Add pool name to the fair scheduler allocation " + + "file. Valid pools are: " + + StringUtils.join(", ", declaredPools)); + } + } diff --git a/hadoop-mapreduce-project/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java b/hadoop-mapreduce-project/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java index 08074f180c3..f0d17d80a80 100644 --- a/hadoop-mapreduce-project/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java +++ b/hadoop-mapreduce-project/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java @@ -28,6 +28,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.TreeSet; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; @@ -114,6 +116,8 @@ public class PoolManager { private long lastSuccessfulReload; // Last time we successfully reloaded pools private boolean lastReloadAttemptFailed = false; + private Set declaredPools = new TreeSet(); + public PoolManager(FairScheduler scheduler) { this.scheduler = scheduler; } @@ -370,6 +374,8 @@ public class PoolManager { this.fairSharePreemptionTimeout = fairSharePreemptionTimeout; this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout; this.defaultSchedulingMode = defaultSchedulingMode; + this.declaredPools = Collections.unmodifiableSet(new TreeSet( + poolNamesInAllocFile)); for (String name: poolNamesInAllocFile) { Pool pool = getPool(name); if (poolModes.containsKey(name)) { @@ -543,4 +549,9 @@ public class PoolManager { pool.updateMetrics(); } } + + public synchronized Set getDeclaredPools() { + return declaredPools; + } + } diff --git a/hadoop-mapreduce-project/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/UndeclaredPoolException.java b/hadoop-mapreduce-project/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/UndeclaredPoolException.java new file mode 100644 index 00000000000..cdd0ef68fc3 --- /dev/null +++ b/hadoop-mapreduce-project/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/UndeclaredPoolException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; + +/** + * Thrown when the pool is not declared in the fair scheduler allocation file. + */ +public class UndeclaredPoolException extends IOException { + + private static final long serialVersionUID = -3559057276650280117L; + + public UndeclaredPoolException(String message) { + super(message); + } +} diff --git a/hadoop-mapreduce-project/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairSchedulerPoolNames.java b/hadoop-mapreduce-project/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairSchedulerPoolNames.java new file mode 100644 index 00000000000..4e4351556b3 --- /dev/null +++ b/hadoop-mapreduce-project/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairSchedulerPoolNames.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.net.URI; + +import static org.junit.Assert.*; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestFairSchedulerPoolNames { + + final static String TEST_DIR = new File(System.getProperty("test.build.data", + "build/contrib/streaming/test/data")).getAbsolutePath(); + final static String ALLOC_FILE = new File(TEST_DIR, "test-pools") + .getAbsolutePath(); + + private static final String POOL_PROPERTY = "pool"; + private String namenode; + private MiniDFSCluster miniDFSCluster = null; + private MiniMRCluster miniMRCluster = null; + + /** + * Note that The PoolManager.ALLOW_UNDECLARED_POOLS_KEY property is set to + * false. So, the default pool is not added, and only pool names in the + * scheduler allocation file are considered valid. + */ + @Before + public void setUp() throws Exception { + new File(TEST_DIR).mkdirs(); // Make sure data directory exists + // Create an allocation file with only one pool defined. + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("1"); + out.println("1"); + out.println(""); + out.println(""); + out.close(); + + Configuration conf = new Configuration(); + miniDFSCluster = new MiniDFSCluster(conf, 1, true, null); + namenode = miniDFSCluster.getFileSystem().getUri().toString(); + + JobConf clusterConf = new JobConf(); + clusterConf.set("mapred.jobtracker.taskScheduler", FairScheduler.class + .getName()); + clusterConf.set("mapred.fairscheduler.allocation.file", ALLOC_FILE); + clusterConf.set("mapred.fairscheduler.poolnameproperty", POOL_PROPERTY); + clusterConf.setBoolean(FairScheduler.ALLOW_UNDECLARED_POOLS_KEY, false); + miniMRCluster = new MiniMRCluster(1, namenode, 1, null, null, clusterConf); + } + + @After + public void tearDown() throws Exception { + if (miniDFSCluster != null) { + miniDFSCluster.shutdown(); + } + if (miniMRCluster != null) { + miniMRCluster.shutdown(); + } + } + + private void submitJob(String pool) throws IOException { + JobConf conf = new JobConf(); + final Path inDir = new Path("/tmp/testing/wc/input"); + final Path outDir = new Path("/tmp/testing/wc/output"); + FileSystem fs = FileSystem.get(URI.create(namenode), conf); + fs.delete(outDir, true); + if (!fs.mkdirs(inDir)) { + throw new IOException("Mkdirs failed to create " + inDir.toString()); + } + DataOutputStream file = fs.create(new Path(inDir, "part-00000")); + file.writeBytes("Sample text"); + file.close(); + + FileSystem.setDefaultUri(conf, namenode); + conf.set("mapred.job.tracker", "localhost:" + + miniMRCluster.getJobTrackerPort()); + conf.setJobName("wordcount"); + conf.setInputFormat(TextInputFormat.class); + + // the keys are words (strings) + conf.setOutputKeyClass(Text.class); + // the values are counts (ints) + conf.setOutputValueClass(IntWritable.class); + + conf.setMapperClass(WordCount.MapClass.class); + conf.setCombinerClass(WordCount.Reduce.class); + conf.setReducerClass(WordCount.Reduce.class); + + FileInputFormat.setInputPaths(conf, inDir); + FileOutputFormat.setOutputPath(conf, outDir); + conf.setNumMapTasks(1); + conf.setNumReduceTasks(0); + + if (pool != null) { + conf.set(POOL_PROPERTY, pool); + } + + JobClient.runJob(conf); + } + + /** + * Tests job submission using the default pool name. + */ + @Test + public void testDefaultPoolName() { + Throwable t = null; + try { + submitJob(null); + } catch (Exception e) { + t = e; + } + assertNotNull("No exception during submission", t); + assertTrue("Incorrect exception message", t.getMessage().contains( + "Add pool name to the fair scheduler allocation file")); + } + + /** + * Tests job submission using a valid pool name (i.e., name exists in the fair + * scheduler allocation file). + */ + @Test + public void testValidPoolName() { + Throwable t = null; + try { + submitJob("poolA"); + } catch (Exception e) { + t = e; + } + assertNull("Exception during submission", t); + } + + /** + * Tests job submission using an invalid pool name (i.e., name doesn't exist + * in the fair scheduler allocation file). + */ + @Test + public void testInvalidPoolName() { + Throwable t = null; + try { + submitJob("poolB"); + } catch (Exception e) { + t = e; + } + assertNotNull("No exception during submission", t); + assertTrue("Incorrect exception message", t.getMessage().contains( + "Add pool name to the fair scheduler allocation file")); + } + +} diff --git a/hadoop-mapreduce-project/src/docs/src/documentation/content/xdocs/fair_scheduler.xml b/hadoop-mapreduce-project/src/docs/src/documentation/content/xdocs/fair_scheduler.xml index b4d8893f0cc..5069943c250 100644 --- a/hadoop-mapreduce-project/src/docs/src/documentation/content/xdocs/fair_scheduler.xml +++ b/hadoop-mapreduce-project/src/docs/src/documentation/content/xdocs/fair_scheduler.xml @@ -192,6 +192,15 @@ + + mapred.fairscheduler.allow.undeclared.pools + + + Boolean property for enabling job submission to pools not declared + in the allocation file. Default: true. + + + mapred.fairscheduler.allocation.file diff --git a/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobTracker.java b/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobTracker.java index 12091642317..870d02f5f8c 100644 --- a/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobTracker.java +++ b/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobTracker.java @@ -3161,6 +3161,13 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, throw ace; } + try { + this.taskScheduler.checkJobSubmission(job); + } catch (IOException ioe){ + LOG.error("Problem in submitting job " + jobId, ioe); + throw ioe; + } + // Check the job if it cannot run in the cluster because of invalid memory // requirements. try { diff --git a/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/TaskScheduler.java b/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/TaskScheduler.java index afee66afdbb..d786346c0bd 100644 --- a/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/TaskScheduler.java +++ b/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/TaskScheduler.java @@ -104,4 +104,14 @@ abstract class TaskScheduler implements Configurable { QueueRefresher getQueueRefresher() { return null; } + + /** + * Subclasses can override to provide any scheduler-specific checking + * mechanism for job submission. + * @param job + * @throws IOException + */ + public void checkJobSubmission(JobInProgress job) throws IOException{ + } + }