MAPREDUCE-6238. MR2 can't run local jobs with -libjars command options which is a regression from MR1 (zxu via rkanter)

This commit is contained in:
Robert Kanter 2015-04-20 14:14:08 -07:00
parent f967fd2f21
commit d50e8f0928
4 changed files with 96 additions and 7 deletions

View File

@ -334,6 +334,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6266. Job#getTrackingURL should consistently return a proper URL MAPREDUCE-6266. Job#getTrackingURL should consistently return a proper URL
(rchiang via rkanter) (rchiang via rkanter)
MAPREDUCE-6238. MR2 can't run local jobs with -libjars command options
which is a regression from MR1 (zxu via rkanter)
Release 2.7.1 - UNRELEASED Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -100,18 +100,12 @@ class LocalDistributedCacheManager {
Path[] archiveClassPaths = DistributedCache.getArchiveClassPaths(conf); Path[] archiveClassPaths = DistributedCache.getArchiveClassPaths(conf);
if (archiveClassPaths != null) { if (archiveClassPaths != null) {
for (Path p : archiveClassPaths) { for (Path p : archiveClassPaths) {
FileSystem remoteFS = p.getFileSystem(conf);
p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory()));
classpaths.put(p.toUri().getPath().toString(), p); classpaths.put(p.toUri().getPath().toString(), p);
} }
} }
Path[] fileClassPaths = DistributedCache.getFileClassPaths(conf); Path[] fileClassPaths = DistributedCache.getFileClassPaths(conf);
if (fileClassPaths != null) { if (fileClassPaths != null) {
for (Path p : fileClassPaths) { for (Path p : fileClassPaths) {
FileSystem remoteFS = p.getFileSystem(conf);
p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory()));
classpaths.put(p.toUri().getPath().toString(), p); classpaths.put(p.toUri().getPath().toString(), p);
} }
} }

View File

@ -127,7 +127,7 @@ class JobResourceUploader {
Path tmp = new Path(tmpjars); Path tmp = new Path(tmpjars);
Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, replication); Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, replication);
DistributedCache.addFileToClassPath( DistributedCache.addFileToClassPath(
new Path(newPath.toUri().getPath()), conf); new Path(newPath.toUri().getPath()), conf, jtFs);
} }
} }

View File

@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URL;
import java.util.jar.JarOutputStream;
import java.util.zip.ZipEntry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.SleepJob;
import org.apache.hadoop.util.ToolRunner;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
/**
* check for the job submission options of
* -jt local -libjars
*/
public class TestLocalJobSubmission {
private static Path TEST_ROOT_DIR =
new Path(System.getProperty("test.build.data","/tmp"));
@Before
public void configure() throws Exception {
}
@After
public void cleanup() {
}
/**
* test the local job submission options of
* -jt local -libjars
* @throws IOException
*/
@Test
public void testLocalJobLibjarsOption() throws IOException {
Path jarPath = makeJar(new Path(TEST_ROOT_DIR, "test.jar"));
Configuration conf = new Configuration();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://testcluster");
final String[] args = {
"-jt" , "local", "-libjars", jarPath.toString(),
"-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
};
int res = -1;
try {
res = ToolRunner.run(conf, new SleepJob(), args);
} catch (Exception e) {
System.out.println("Job failed with " + e.getLocalizedMessage());
e.printStackTrace(System.out);
fail("Job failed");
}
assertEquals("dist job res is not 0:", 0, res);
}
private Path makeJar(Path p) throws IOException {
FileOutputStream fos = new FileOutputStream(new File(p.toString()));
JarOutputStream jos = new JarOutputStream(fos);
ZipEntry ze = new ZipEntry("test.jar.inside");
jos.putNextEntry(ze);
jos.write(("inside the jar!").getBytes());
jos.closeEntry();
jos.close();
return p;
}
}