From e6f85573e706b4e03797ce4a8e997f37c0bee1a8 Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Tue, 1 Nov 2011 03:19:48 +0000 Subject: [PATCH] Merge -c 1195792 from trunk to branch-0.23 to fix MAPREDUCE-3237. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1195793 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../mapred/LocalClientProtocolProvider.java | 0 .../mapred/LocalDistributedCacheManager.java | 215 ++++++++++++++++++ .../apache/hadoop/mapred/LocalJobRunner.java | 106 ++++----- .../hadoop/mapred/LocalJobRunnerMetrics.java | 98 ++++++++ ....mapreduce.protocol.ClientProtocolProvider | 14 ++ .../mapred/TestMRWithDistributedCache.java | 11 - .../src/main/resources/mapred-default.xml | 2 +- .../apache/hadoop/yarn/util}/FSDownload.java | 4 +- .../hadoop/yarn/util}/TestFSDownload.java | 2 +- .../localizer/ContainerLocalizer.java | 1 + .../ResourceLocalizationService.java | 1 + ....mapreduce.protocol.ClientProtocolProvider | 1 - 13 files changed, 386 insertions(+), 72 deletions(-) rename hadoop-mapreduce-project/{src => hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main}/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java (100%) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java rename hadoop-mapreduce-project/{src => hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main}/java/org/apache/hadoop/mapred/LocalJobRunner.java (91%) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider rename hadoop-mapreduce-project/{src/test/mapred => hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java}/org/apache/hadoop/mapred/TestMRWithDistributedCache.java (96%) rename hadoop-mapreduce-project/hadoop-yarn/{hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer => hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util}/FSDownload.java (98%) rename hadoop-mapreduce-project/hadoop-yarn/{hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer => hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util}/TestFSDownload.java (99%) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index e7fa759b7e4..0c4bb24e0a5 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1864,6 +1864,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-3192. Fix Javadoc warning in JobClient.java and Cluster.java. (jitendra) + MAPREDUCE-3237. Move LocalJobRunner to hadoop-mapreduce-client-core. + (tomwhite via acmurthy) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java similarity index 100% rename from hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java new file mode 100644 index 00000000000..19f558c6726 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import com.google.common.collect.Maps; + +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLClassLoader; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.filecache.DistributedCache; +import org.apache.hadoop.mapreduce.v2.util.MRApps; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.FSDownload; + +/** + * A helper class for managing the distributed cache for {@link LocalJobRunner}. + */ +@SuppressWarnings("deprecation") +class LocalDistributedCacheManager { + public static final Log LOG = + LogFactory.getLog(LocalDistributedCacheManager.class); + + private List localArchives = new ArrayList(); + private List localFiles = new ArrayList(); + private List localClasspaths = new ArrayList(); + + private boolean setupCalled = false; + + /** + * Set up the distributed cache by localizing the resources, and updating + * the configuration with references to the localized resources. + * @param conf + * @throws IOException + */ + public void setup(JobConf conf) throws IOException { + // Generate YARN local resources objects corresponding to the distributed + // cache configuration + Map localResources = + new LinkedHashMap(); + MRApps.setupDistributedCache(conf, localResources); + + // Find which resources are to be put on the local classpath + Map classpaths = new HashMap(); + Path[] archiveClassPaths = DistributedCache.getArchiveClassPaths(conf); + if (archiveClassPaths != null) { + for (Path p : archiveClassPaths) { + FileSystem remoteFS = p.getFileSystem(conf); + p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory())); + classpaths.put(p.toUri().getPath().toString(), p); + } + } + Path[] fileClassPaths = DistributedCache.getFileClassPaths(conf); + if (fileClassPaths != null) { + for (Path p : fileClassPaths) { + FileSystem remoteFS = p.getFileSystem(conf); + p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory())); + classpaths.put(p.toUri().getPath().toString(), p); + } + } + + // Localize the resources + LocalDirAllocator localDirAllocator = + new LocalDirAllocator(MRConfig.LOCAL_DIR); + FileContext localFSFileContext = FileContext.getLocalFSFileContext(); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + + Map> resourcesToPaths = Maps.newHashMap(); + ExecutorService exec = Executors.newCachedThreadPool(); + for (LocalResource resource : localResources.values()) { + Callable download = new FSDownload(localFSFileContext, ugi, conf, + localDirAllocator, resource, new Random()); + Future future = exec.submit(download); + resourcesToPaths.put(resource, future); + } + for (LocalResource resource : localResources.values()) { + Path path; + try { + path = resourcesToPaths.get(resource).get(); + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e); + } + String pathString = path.toUri().toString(); + if (resource.getType() == LocalResourceType.ARCHIVE) { + localArchives.add(pathString); + } else if (resource.getType() == LocalResourceType.FILE) { + localFiles.add(pathString); + } + Path resourcePath; + try { + resourcePath = ConverterUtils.getPathFromYarnURL(resource.getResource()); + } catch (URISyntaxException e) { + throw new IOException(e); + } + LOG.info(String.format("Localized %s as %s", resourcePath, path)); + String cp = resourcePath.toUri().getPath(); + if (classpaths.keySet().contains(cp)) { + localClasspaths.add(path.toUri().getPath().toString()); + } + } + + // Update the configuration object with localized data. + if (!localArchives.isEmpty()) { + conf.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils + .arrayToString(localArchives.toArray(new String[localArchives + .size()]))); + } + if (!localFiles.isEmpty()) { + conf.set(MRJobConfig.CACHE_LOCALFILES, StringUtils + .arrayToString(localFiles.toArray(new String[localArchives + .size()]))); + } + if (DistributedCache.getSymlink(conf)) { + // This is not supported largely because, + // for a Child subprocess, the cwd in LocalJobRunner + // is not a fresh slate, but rather the user's working directory. + // This is further complicated because the logic in + // setupWorkDir only creates symlinks if there's a jarfile + // in the configuration. + LOG.warn("LocalJobRunner does not support " + + "symlinking into current working dir."); + } + setupCalled = true; + } + + /** + * Are the resources that should be added to the classpath? + * Should be called after setup(). + * + */ + public boolean hasLocalClasspaths() { + if (!setupCalled) { + throw new IllegalStateException( + "hasLocalClasspaths() should be called after setup()"); + } + return !localClasspaths.isEmpty(); + } + + /** + * Creates a class loader that includes the designated + * files and archives. + */ + public ClassLoader makeClassLoader(final ClassLoader parent) + throws MalformedURLException { + final URL[] urls = new URL[localClasspaths.size()]; + for (int i = 0; i < localClasspaths.size(); ++i) { + urls[i] = new File(localClasspaths.get(i)).toURI().toURL(); + LOG.info(urls[i]); + } + return AccessController.doPrivileged(new PrivilegedAction() { + @Override + public ClassLoader run() { + return new URLClassLoader(urls, parent); + } + }); + } + + public void close() throws IOException { + FileContext localFSFileContext = FileContext.getLocalFSFileContext(); + for (String archive : localArchives) { + localFSFileContext.delete(new Path(archive), true); + } + for (String file : localFiles) { + localFSFileContext.delete(new Path(file), true); + } + } +} diff --git a/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/LocalJobRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java similarity index 91% rename from hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/LocalJobRunner.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java index 6d75228d05e..c8b59ebdac3 100644 --- a/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/LocalJobRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java @@ -18,7 +18,6 @@ package org.apache.hadoop.mapred; -import java.io.File; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; @@ -27,8 +26,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; -import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -38,28 +37,23 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus; import org.apache.hadoop.mapreduce.ClusterMetrics; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.QueueInfo; import org.apache.hadoop.mapreduce.TaskCompletionEvent; import org.apache.hadoop.mapreduce.TaskTrackerInfo; import org.apache.hadoop.mapreduce.TaskType; -import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus; -import org.apache.hadoop.mapreduce.filecache.DistributedCache; -import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager; -import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager; import org.apache.hadoop.mapreduce.protocol.ClientProtocol; import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; +import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; +import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader; import org.apache.hadoop.mapreduce.v2.LogParams; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; -import org.apache.hadoop.mapreduce.server.jobtracker.State; -import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader; -import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.token.Token; @@ -67,6 +61,7 @@ import org.apache.hadoop.security.token.Token; /** Implements MapReduce locally, in-process, for debugging. */ @InterfaceAudience.Private @InterfaceStability.Unstable +@SuppressWarnings("deprecation") public class LocalJobRunner implements ClientProtocol { public static final Log LOG = LogFactory.getLog(LocalJobRunner.class); @@ -82,7 +77,7 @@ public class LocalJobRunner implements ClientProtocol { private int reduce_tasks = 0; final Random rand = new Random(); - private JobTrackerInstrumentation myMetrics = null; + private LocalJobRunnerMetrics myMetrics = null; private static final String jobDir = "localRunner/"; @@ -125,8 +120,7 @@ public class LocalJobRunner implements ClientProtocol { private FileSystem localFs; boolean killed = false; - private TrackerDistributedCacheManager trackerDistributerdCacheManager; - private TaskDistributedCacheManager taskDistributedCacheManager; + private LocalDistributedCacheManager localDistributedCacheManager; public long getProtocolVersion(String protocol, long clientVersion) { return TaskUmbilicalProtocol.versionID; @@ -150,27 +144,8 @@ public class LocalJobRunner implements ClientProtocol { // Manage the distributed cache. If there are files to be copied, // this will trigger localFile to be re-written again. - this.trackerDistributerdCacheManager = - new TrackerDistributedCacheManager(conf, new DefaultTaskController()); - this.taskDistributedCacheManager = - trackerDistributerdCacheManager.newTaskDistributedCacheManager(conf); - taskDistributedCacheManager.setup( - new LocalDirAllocator(MRConfig.LOCAL_DIR), - new File(systemJobDir.toString()), - "archive", "archive"); - - if (DistributedCache.getSymlink(conf)) { - // This is not supported largely because, - // for a Child subprocess, the cwd in LocalJobRunner - // is not a fresh slate, but rather the user's working directory. - // This is further complicated because the logic in - // setupWorkDir only creates symlinks if there's a jarfile - // in the configuration. - LOG.warn("LocalJobRunner does not support " + - "symlinking into current working dir."); - } - // Setup the symlinks for the distributed cache. - TaskRunner.setupWorkDir(conf, new File(localJobDir.toUri()).getAbsoluteFile()); + localDistributedCacheManager = new LocalDistributedCacheManager(); + localDistributedCacheManager.setup(conf); // Write out configuration file. Instead of copying it from // systemJobFile, we re-write it, since setup(), above, may have @@ -184,8 +159,8 @@ public class LocalJobRunner implements ClientProtocol { this.job = new JobConf(localJobFile); // Job (the current object) is a Thread, so we wrap its class loader. - if (!taskDistributedCacheManager.getClassPaths().isEmpty()) { - setContextClassLoader(taskDistributedCacheManager.makeClassLoader( + if (localDistributedCacheManager.hasLocalClasspaths()) { + setContextClassLoader(localDistributedCacheManager.makeClassLoader( getContextClassLoader())); } @@ -200,10 +175,6 @@ public class LocalJobRunner implements ClientProtocol { this.start(); } - JobProfile getProfile() { - return profile; - } - /** * A Runnable instance that handles a map task to be run by an executor. */ @@ -239,7 +210,7 @@ public class LocalJobRunner implements ClientProtocol { info.getSplitIndex(), 1); map.setUser(UserGroupInformation.getCurrentUser(). getShortUserName()); - TaskRunner.setupChildMapredLocalDirs(map, localConf); + setupChildMapredLocalDirs(map, localConf); MapOutputFile mapOutput = new MROutputFiles(); mapOutput.setConf(localConf); @@ -333,7 +304,6 @@ public class LocalJobRunner implements ClientProtocol { return executor; } - @SuppressWarnings("unchecked") @Override public void run() { JobID jobId = profile.getJobID(); @@ -399,7 +369,7 @@ public class LocalJobRunner implements ClientProtocol { getShortUserName()); JobConf localConf = new JobConf(job); localConf.set("mapreduce.jobtracker.address", "local"); - TaskRunner.setupChildMapredLocalDirs(reduce, localConf); + setupChildMapredLocalDirs(reduce, localConf); // move map output to reduce input for (int i = 0; i < mapIds.size(); i++) { if (!this.isInterrupted()) { @@ -473,8 +443,7 @@ public class LocalJobRunner implements ClientProtocol { fs.delete(systemJobFile.getParent(), true); // delete submit dir localFs.delete(localJobFile, true); // delete local copy // Cleanup distributed cache - taskDistributedCacheManager.release(); - trackerDistributerdCacheManager.purgeCache(); + localDistributedCacheManager.close(); } catch (IOException e) { LOG.warn("Error cleaning up "+id+": "+e); } @@ -593,7 +562,7 @@ public class LocalJobRunner implements ClientProtocol { public LocalJobRunner(JobConf conf) throws IOException { this.fs = FileSystem.getLocal(conf); this.conf = conf; - myMetrics = new JobTrackerMetricsInst(null, new JobConf(conf)); + myMetrics = new LocalJobRunnerMetrics(new JobConf(conf)); } // JobSubmissionProtocol methods @@ -661,14 +630,6 @@ public class LocalJobRunner implements ClientProtocol { reduce_tasks, 0, 0, 1, 1, jobs.size(), 1, 0, 0); } - /** - * @deprecated Use {@link #getJobTrackerStatus()} instead. - */ - @Deprecated - public State getJobTrackerState() throws IOException, InterruptedException { - return State.RUNNING; - } - public JobTrackerStatus getJobTrackerStatus() { return JobTrackerStatus.RUNNING; } @@ -723,7 +684,7 @@ public class LocalJobRunner implements ClientProtocol { } /** - * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getQueueAdmins() + * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getQueueAdmins(String) */ public AccessControlList getQueueAdmins(String queueName) throws IOException { return new AccessControlList(" ");// no queue admins for local job runner @@ -820,4 +781,37 @@ public class LocalJobRunner implements ClientProtocol { throws IOException, InterruptedException { throw new UnsupportedOperationException("Not supported"); } + + static void setupChildMapredLocalDirs(Task t, JobConf conf) { + String[] localDirs = conf.getTrimmedStrings(MRConfig.LOCAL_DIR); + String jobId = t.getJobID().toString(); + String taskId = t.getTaskID().toString(); + boolean isCleanup = t.isTaskCleanupTask(); + String user = t.getUser(); + StringBuffer childMapredLocalDir = + new StringBuffer(localDirs[0] + Path.SEPARATOR + + getLocalTaskDir(user, jobId, taskId, isCleanup)); + for (int i = 1; i < localDirs.length; i++) { + childMapredLocalDir.append("," + localDirs[i] + Path.SEPARATOR + + getLocalTaskDir(user, jobId, taskId, isCleanup)); + } + LOG.debug(MRConfig.LOCAL_DIR + " for child : " + childMapredLocalDir); + conf.set(MRConfig.LOCAL_DIR, childMapredLocalDir.toString()); + } + + static final String TASK_CLEANUP_SUFFIX = ".cleanup"; + static final String SUBDIR = jobDir; + static final String JOBCACHE = "jobcache"; + + static String getLocalTaskDir(String user, String jobid, String taskid, + boolean isCleanupAttempt) { + String taskDir = SUBDIR + Path.SEPARATOR + user + Path.SEPARATOR + JOBCACHE + + Path.SEPARATOR + jobid + Path.SEPARATOR + taskid; + if (isCleanupAttempt) { + taskDir = taskDir + TASK_CLEANUP_SUFFIX; + } + return taskDir; + } + + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java new file mode 100644 index 00000000000..aec70edefc2 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import org.apache.hadoop.metrics.MetricsContext; +import org.apache.hadoop.metrics.MetricsRecord; +import org.apache.hadoop.metrics.MetricsUtil; +import org.apache.hadoop.metrics.Updater; +import org.apache.hadoop.metrics.jvm.JvmMetrics; + +@SuppressWarnings("deprecation") +class LocalJobRunnerMetrics implements Updater { + private final MetricsRecord metricsRecord; + + private int numMapTasksLaunched = 0; + private int numMapTasksCompleted = 0; + private int numReduceTasksLaunched = 0; + private int numReduceTasksCompleted = 0; + private int numWaitingMaps = 0; + private int numWaitingReduces = 0; + + public LocalJobRunnerMetrics(JobConf conf) { + String sessionId = conf.getSessionId(); + // Initiate JVM Metrics + JvmMetrics.init("JobTracker", sessionId); + // Create a record for map-reduce metrics + MetricsContext context = MetricsUtil.getContext("mapred"); + // record name is jobtracker for compatibility + metricsRecord = MetricsUtil.createRecord(context, "jobtracker"); + metricsRecord.setTag("sessionId", sessionId); + context.registerUpdater(this); + } + + /** + * Since this object is a registered updater, this method will be called + * periodically, e.g. every 5 seconds. + */ + public void doUpdates(MetricsContext unused) { + synchronized (this) { + metricsRecord.incrMetric("maps_launched", numMapTasksLaunched); + metricsRecord.incrMetric("maps_completed", numMapTasksCompleted); + metricsRecord.incrMetric("reduces_launched", numReduceTasksLaunched); + metricsRecord.incrMetric("reduces_completed", numReduceTasksCompleted); + metricsRecord.incrMetric("waiting_maps", numWaitingMaps); + metricsRecord.incrMetric("waiting_reduces", numWaitingReduces); + + numMapTasksLaunched = 0; + numMapTasksCompleted = 0; + numReduceTasksLaunched = 0; + numReduceTasksCompleted = 0; + numWaitingMaps = 0; + numWaitingReduces = 0; + } + metricsRecord.update(); + } + + public synchronized void launchMap(TaskAttemptID taskAttemptID) { + ++numMapTasksLaunched; + decWaitingMaps(taskAttemptID.getJobID(), 1); + } + + public synchronized void completeMap(TaskAttemptID taskAttemptID) { + ++numMapTasksCompleted; + } + + public synchronized void launchReduce(TaskAttemptID taskAttemptID) { + ++numReduceTasksLaunched; + decWaitingReduces(taskAttemptID.getJobID(), 1); + } + + public synchronized void completeReduce(TaskAttemptID taskAttemptID) { + ++numReduceTasksCompleted; + } + + private synchronized void decWaitingMaps(JobID id, int task) { + numWaitingMaps -= task; + } + + private synchronized void decWaitingReduces(JobID id, int task){ + numWaitingReduces -= task; + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider new file mode 100644 index 00000000000..5b8dfdcb4ef --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider @@ -0,0 +1,14 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +org.apache.hadoop.mapred.LocalClientProtocolProvider diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestMRWithDistributedCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java similarity index 96% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestMRWithDistributedCache.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java index 8b1691a46d6..ed89bf9fd4f 100644 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestMRWithDistributedCache.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java @@ -163,17 +163,6 @@ public class TestMRWithDistributedCache extends TestCase { testWithConf(c); } - /** Tests using a full MiniMRCluster. */ - public void testMiniMRJobRunner() throws Exception { - MiniMRCluster m = new MiniMRCluster(1, "file:///", 1); - try { - testWithConf(m.createJobConf()); - } finally { - m.shutdown(); - } - - } - private Path createTempFile(String filename, String contents) throws IOException { Path path = new Path(TEST_ROOT_DIR, filename); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index ccb3b46aa7f..9bc03caa563 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -1166,7 +1166,7 @@ mapreduce.framework.name - yarn + local The runtime framework for executing MapReduce jobs. Can be one of local, classic or yarn. diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/FSDownload.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java similarity index 98% rename from hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/FSDownload.java rename to hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java index 85d16907016..cccb140d99b 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/FSDownload.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; +package org.apache.hadoop.yarn.util; import java.io.File; import java.io.FileNotFoundException; @@ -65,7 +65,7 @@ public class FSDownload implements Callable { static final FsPermission PUBLIC_DIR_PERMS = new FsPermission((short) 0755); static final FsPermission PRIVATE_DIR_PERMS = new FsPermission((short) 0700); - FSDownload(FileContext files, UserGroupInformation ugi, Configuration conf, + public FSDownload(FileContext files, UserGroupInformation ugi, Configuration conf, LocalDirAllocator dirs, LocalResource resource, Random rand) { this.conf = conf; this.dirs = dirs; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestFSDownload.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java similarity index 99% rename from hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestFSDownload.java rename to hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java index 74aa57052c0..b7237bdefc2 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestFSDownload.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; +package org.apache.hadoop.yarn.util; import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java index 95f85d2d8c3..392128733fb 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.secu import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.FSDownload; import com.google.common.util.concurrent.ThreadFactoryBuilder; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 1b0fe8f538a..9ec83cdbc55 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -110,6 +110,7 @@ import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProv import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.FSDownload; import com.google.common.util.concurrent.ThreadFactoryBuilder; diff --git a/hadoop-mapreduce-project/src/java/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider b/hadoop-mapreduce-project/src/java/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider index 1a54e30048d..0b4d2302ef5 100644 --- a/hadoop-mapreduce-project/src/java/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider +++ b/hadoop-mapreduce-project/src/java/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider @@ -12,4 +12,3 @@ # limitations under the License. # org.apache.hadoop.mapred.JobTrackerClientProtocolProvider -org.apache.hadoop.mapred.LocalClientProtocolProvider