svn merge -c 1331440. FIXES: HADOOP-8305. distcp over viewfs is broken (John George via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1331441 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-04-27 14:41:38 +00:00
parent be4dfe86af
commit 0581c7e598
8 changed files with 502 additions and 15 deletions

View File

@ -360,6 +360,8 @@ Release 0.23.3 - UNRELEASED
HADOOP-8227. Allow RPC to limit ephemeral port range. (bobby) HADOOP-8227. Allow RPC to limit ephemeral port range. (bobby)
HADOOP-8305. distcp over viewfs is broken (John George via bobby)
Release 0.23.2 - UNRELEASED Release 0.23.2 - UNRELEASED
NEW FEATURES NEW FEATURES

View File

@ -1050,9 +1050,9 @@ public class SequenceFile {
int bufferSize = bufferSizeOption == null ? getBufferSize(conf) : int bufferSize = bufferSizeOption == null ? getBufferSize(conf) :
bufferSizeOption.getValue(); bufferSizeOption.getValue();
short replication = replicationOption == null ? short replication = replicationOption == null ?
fs.getDefaultReplication() : fs.getDefaultReplication(p) :
(short) replicationOption.getValue(); (short) replicationOption.getValue();
long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize() : long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) :
blockSizeOption.getValue(); blockSizeOption.getValue();
Progressable progress = progressOption == null ? null : Progressable progress = progressOption == null ? null :
progressOption.getValue(); progressOption.getValue();

View File

@ -613,7 +613,7 @@ public class HadoopArchives implements Tool {
destFs.delete(tmpOutput, false); destFs.delete(tmpOutput, false);
} }
partStream = destFs.create(tmpOutput, false, conf.getInt("io.file.buffer.size", 4096), partStream = destFs.create(tmpOutput, false, conf.getInt("io.file.buffer.size", 4096),
destFs.getDefaultReplication(), blockSize); destFs.getDefaultReplication(tmpOutput), blockSize);
} catch(IOException ie) { } catch(IOException ie) {
throw new RuntimeException("Unable to open output file " + tmpOutput, ie); throw new RuntimeException("Unable to open output file " + tmpOutput, ie);
} }

View File

@ -107,8 +107,8 @@ public class RetriableFileCopyCommand extends RetriableCommand {
throws IOException { throws IOException {
OutputStream outStream = new BufferedOutputStream(targetFS.create( OutputStream outStream = new BufferedOutputStream(targetFS.create(
tmpTargetPath, true, BUFFER_SIZE, tmpTargetPath, true, BUFFER_SIZE,
getReplicationFactor(fileAttributes, sourceFileStatus, targetFS), getReplicationFactor(fileAttributes, sourceFileStatus, targetFS, tmpTargetPath),
getBlockSize(fileAttributes, sourceFileStatus, targetFS), context)); getBlockSize(fileAttributes, sourceFileStatus, targetFS, tmpTargetPath), context));
return copyBytes(sourceFileStatus, outStream, BUFFER_SIZE, true, context); return copyBytes(sourceFileStatus, outStream, BUFFER_SIZE, true, context);
} }
@ -218,16 +218,16 @@ public class RetriableFileCopyCommand extends RetriableCommand {
private static short getReplicationFactor( private static short getReplicationFactor(
EnumSet<FileAttribute> fileAttributes, EnumSet<FileAttribute> fileAttributes,
FileStatus sourceFile, FileSystem targetFS) { FileStatus sourceFile, FileSystem targetFS, Path tmpTargetPath) {
return fileAttributes.contains(FileAttribute.REPLICATION)? return fileAttributes.contains(FileAttribute.REPLICATION)?
sourceFile.getReplication() : targetFS.getDefaultReplication(); sourceFile.getReplication() : targetFS.getDefaultReplication(tmpTargetPath);
} }
private static long getBlockSize( private static long getBlockSize(
EnumSet<FileAttribute> fileAttributes, EnumSet<FileAttribute> fileAttributes,
FileStatus sourceFile, FileSystem targetFS) { FileStatus sourceFile, FileSystem targetFS, Path tmpTargetPath) {
return fileAttributes.contains(FileAttribute.BLOCKSIZE)? return fileAttributes.contains(FileAttribute.BLOCKSIZE)?
sourceFile.getBlockSize() : targetFS.getDefaultBlockSize(); sourceFile.getBlockSize() : targetFS.getDefaultBlockSize(tmpTargetPath);
} }
/** /**

View File

@ -110,9 +110,9 @@ public class TestDistCp {
fs = cluster.getFileSystem(); fs = cluster.getFileSystem();
final Path qualifiedPath = new Path(path).makeQualified(fs.getUri(), final Path qualifiedPath = new Path(path).makeQualified(fs.getUri(),
fs.getWorkingDirectory()); fs.getWorkingDirectory());
final long blockSize = fs.getDefaultBlockSize() * 2; final long blockSize = fs.getDefaultBlockSize(new Path(path)) * 2;
outputStream = fs.create(qualifiedPath, true, 0, outputStream = fs.create(qualifiedPath, true, 0,
(short)(fs.getDefaultReplication()*2), (short)(fs.getDefaultReplication(new Path(path))*2),
blockSize); blockSize);
outputStream.write(new byte[FILE_SIZE]); outputStream.write(new byte[FILE_SIZE]);
pathList.add(qualifiedPath); pathList.add(qualifiedPath);

View File

@ -0,0 +1,485 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.viewfs.*;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.tools.util.TestDistCpUtils;
import org.apache.hadoop.fs.FsConstants;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
public class TestDistCpViewFs {
private static final Log LOG = LogFactory.getLog(TestDistCpViewFs.class);
private static FileSystem fs;
private static Path listFile;
private static Path target;
private static String root;
private static Configuration getConf() throws URISyntaxException {
Configuration conf = new Configuration();
conf.set("mapred.job.tracker", "local");
conf.set("fs.default.name", "file:///");
return conf;
}
@BeforeClass
public static void setup() throws URISyntaxException{
try {
Path fswd = FileSystem.get(getConf()).getWorkingDirectory();
Configuration vConf = ViewFileSystemTestSetup.createConfig();
ConfigUtil.addLink(vConf, "/usr", new URI(fswd.toString()));
fs = FileSystem.get(FsConstants.VIEWFS_URI, vConf);
fs.setWorkingDirectory(new Path("/usr"));
listFile = new Path("target/tmp/listing").makeQualified(fs.getUri(),
fs.getWorkingDirectory());
target = new Path("target/tmp/target").makeQualified(fs.getUri(),
fs.getWorkingDirectory());
root = new Path("target/tmp").makeQualified(fs.getUri(),
fs.getWorkingDirectory()).toString();
TestDistCpUtils.delete(fs, root);
} catch (IOException e) {
LOG.error("Exception encountered ", e);
}
}
@Test
public void testSingleFileMissingTarget() {
caseSingleFileMissingTarget(false);
caseSingleFileMissingTarget(true);
}
private void caseSingleFileMissingTarget(boolean sync) {
try {
addEntries(listFile, "singlefile1/file1");
createFiles("singlefile1/file1");
runTest(listFile, target, sync);
checkResult(target, 1);
} catch (IOException e) {
LOG.error("Exception encountered while testing distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
}
}
@Test
public void testSingleFileTargetFile() {
caseSingleFileTargetFile(false);
caseSingleFileTargetFile(true);
}
private void caseSingleFileTargetFile(boolean sync) {
try {
addEntries(listFile, "singlefile1/file1");
createFiles("singlefile1/file1", target.toString());
runTest(listFile, target, sync);
checkResult(target, 1);
} catch (IOException e) {
LOG.error("Exception encountered while testing distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
}
}
@Test
public void testSingleFileTargetDir() {
caseSingleFileTargetDir(false);
caseSingleFileTargetDir(true);
}
private void caseSingleFileTargetDir(boolean sync) {
try {
addEntries(listFile, "singlefile2/file2");
createFiles("singlefile2/file2");
mkdirs(target.toString());
runTest(listFile, target, sync);
checkResult(target, 1, "file2");
} catch (IOException e) {
LOG.error("Exception encountered while testing distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
}
}
@Test
public void testSingleDirTargetMissing() {
caseSingleDirTargetMissing(false);
caseSingleDirTargetMissing(true);
}
private void caseSingleDirTargetMissing(boolean sync) {
try {
addEntries(listFile, "singledir");
mkdirs(root + "/singledir/dir1");
runTest(listFile, target, sync);
checkResult(target, 1, "dir1");
} catch (IOException e) {
LOG.error("Exception encountered while testing distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
}
}
@Test
public void testSingleDirTargetPresent() {
try {
addEntries(listFile, "singledir");
mkdirs(root + "/singledir/dir1");
mkdirs(target.toString());
runTest(listFile, target, false);
checkResult(target, 1, "singledir/dir1");
} catch (IOException e) {
LOG.error("Exception encountered while testing distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
}
}
@Test
public void testUpdateSingleDirTargetPresent() {
try {
addEntries(listFile, "Usingledir");
mkdirs(root + "/Usingledir/Udir1");
mkdirs(target.toString());
runTest(listFile, target, true);
checkResult(target, 1, "Udir1");
} catch (IOException e) {
LOG.error("Exception encountered while testing distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
}
}
@Test
public void testMultiFileTargetPresent() {
caseMultiFileTargetPresent(false);
caseMultiFileTargetPresent(true);
}
private void caseMultiFileTargetPresent(boolean sync) {
try {
addEntries(listFile, "multifile/file3", "multifile/file4", "multifile/file5");
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
mkdirs(target.toString());
runTest(listFile, target, sync);
checkResult(target, 3, "file3", "file4", "file5");
} catch (IOException e) {
LOG.error("Exception encountered while testing distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
}
}
@Test
public void testMultiFileTargetMissing() {
caseMultiFileTargetMissing(false);
caseMultiFileTargetMissing(true);
}
private void caseMultiFileTargetMissing(boolean sync) {
try {
addEntries(listFile, "multifile/file3", "multifile/file4", "multifile/file5");
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
runTest(listFile, target, sync);
checkResult(target, 3, "file3", "file4", "file5");
} catch (IOException e) {
LOG.error("Exception encountered while testing distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
}
}
@Test
public void testMultiDirTargetPresent() {
try {
addEntries(listFile, "multifile", "singledir");
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
mkdirs(target.toString(), root + "/singledir/dir1");
runTest(listFile, target, false);
checkResult(target, 2, "multifile/file3", "multifile/file4", "multifile/file5", "singledir/dir1");
} catch (IOException e) {
LOG.error("Exception encountered while testing distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
}
}
@Test
public void testUpdateMultiDirTargetPresent() {
try {
addEntries(listFile, "Umultifile", "Usingledir");
createFiles("Umultifile/Ufile3", "Umultifile/Ufile4", "Umultifile/Ufile5");
mkdirs(target.toString(), root + "/Usingledir/Udir1");
runTest(listFile, target, true);
checkResult(target, 4, "Ufile3", "Ufile4", "Ufile5", "Udir1");
} catch (IOException e) {
LOG.error("Exception encountered while testing distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
}
}
@Test
public void testMultiDirTargetMissing() {
try {
addEntries(listFile, "multifile", "singledir");
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
mkdirs(root + "/singledir/dir1");
runTest(listFile, target, false);
checkResult(target, 2, "multifile/file3", "multifile/file4",
"multifile/file5", "singledir/dir1");
} catch (IOException e) {
LOG.error("Exception encountered while testing distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
}
}
@Test
public void testUpdateMultiDirTargetMissing() {
try {
addEntries(listFile, "multifile", "singledir");
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
mkdirs(root + "/singledir/dir1");
runTest(listFile, target, true);
checkResult(target, 4, "file3", "file4", "file5", "dir1");
} catch (IOException e) {
LOG.error("Exception encountered while testing distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
}
}
@Test
public void testGlobTargetMissingSingleLevel() {
try {
Path listFile = new Path("target/tmp1/listing").makeQualified(fs.getUri(),
fs.getWorkingDirectory());
addEntries(listFile, "*");
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
createFiles("singledir/dir2/file6");
runTest(listFile, target, false);
checkResult(target, 2, "multifile/file3", "multifile/file4", "multifile/file5",
"singledir/dir2/file6");
} catch (IOException e) {
LOG.error("Exception encountered while testing distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
TestDistCpUtils.delete(fs, "target/tmp1");
}
}
@Test
public void testUpdateGlobTargetMissingSingleLevel() {
try {
Path listFile = new Path("target/tmp1/listing").makeQualified(fs.getUri(),
fs.getWorkingDirectory());
addEntries(listFile, "*");
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
createFiles("singledir/dir2/file6");
runTest(listFile, target, true);
checkResult(target, 4, "file3", "file4", "file5", "dir2/file6");
} catch (IOException e) {
LOG.error("Exception encountered while running distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
TestDistCpUtils.delete(fs, "target/tmp1");
}
}
@Test
public void testGlobTargetMissingMultiLevel() {
try {
Path listFile = new Path("target/tmp1/listing").makeQualified(fs.getUri(),
fs.getWorkingDirectory());
addEntries(listFile, "*/*");
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
createFiles("singledir1/dir3/file7", "singledir1/dir3/file8",
"singledir1/dir3/file9");
runTest(listFile, target, false);
checkResult(target, 4, "file3", "file4", "file5",
"dir3/file7", "dir3/file8", "dir3/file9");
} catch (IOException e) {
LOG.error("Exception encountered while running distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
TestDistCpUtils.delete(fs, "target/tmp1");
}
}
@Test
public void testUpdateGlobTargetMissingMultiLevel() {
try {
Path listFile = new Path("target/tmp1/listing").makeQualified(fs.getUri(),
fs.getWorkingDirectory());
addEntries(listFile, "*/*");
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
createFiles("singledir1/dir3/file7", "singledir1/dir3/file8",
"singledir1/dir3/file9");
runTest(listFile, target, true);
checkResult(target, 6, "file3", "file4", "file5",
"file7", "file8", "file9");
} catch (IOException e) {
LOG.error("Exception encountered while running distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
TestDistCpUtils.delete(fs, "target/tmp1");
}
}
private void addEntries(Path listFile, String... entries) throws IOException {
OutputStream out = fs.create(listFile);
try {
for (String entry : entries){
out.write((root + "/" + entry).getBytes());
out.write("\n".getBytes());
}
} finally {
out.close();
}
}
private void createFiles(String... entries) throws IOException {
String e;
for (String entry : entries){
if ((new Path(entry)).isAbsolute())
{
e = entry;
}
else
{
e = root + "/" + entry;
}
OutputStream out = fs.create(new Path(e));
try {
out.write((e).getBytes());
out.write("\n".getBytes());
} finally {
out.close();
}
}
}
private void mkdirs(String... entries) throws IOException {
for (String entry : entries){
fs.mkdirs(new Path(entry));
}
}
private void runTest(Path listFile, Path target, boolean sync) throws IOException {
DistCpOptions options = new DistCpOptions(listFile, target);
options.setSyncFolder(sync);
try {
new DistCp(getConf(), options).execute();
} catch (Exception e) {
LOG.error("Exception encountered ", e);
throw new IOException(e);
}
}
private void checkResult(Path target, int count, String... relPaths) throws IOException {
Assert.assertEquals(count, fs.listStatus(target).length);
if (relPaths == null || relPaths.length == 0) {
Assert.assertTrue(target.toString(), fs.exists(target));
return;
}
for (String relPath : relPaths) {
Assert.assertTrue(new Path(target, relPath).toString(), fs.exists(new Path(target, relPath)));
}
}
}

View File

@ -127,9 +127,9 @@ public class TestCopyMapper {
fs = cluster.getFileSystem(); fs = cluster.getFileSystem();
final Path qualifiedPath = new Path(path).makeQualified(fs.getUri(), final Path qualifiedPath = new Path(path).makeQualified(fs.getUri(),
fs.getWorkingDirectory()); fs.getWorkingDirectory());
final long blockSize = fs.getDefaultBlockSize() * 2; final long blockSize = fs.getDefaultBlockSize(qualifiedPath) * 2;
outputStream = fs.create(qualifiedPath, true, 0, outputStream = fs.create(qualifiedPath, true, 0,
(short)(fs.getDefaultReplication()*2), (short)(fs.getDefaultReplication(qualifiedPath)*2),
blockSize); blockSize);
outputStream.write(new byte[FILE_SIZE]); outputStream.write(new byte[FILE_SIZE]);
pathList.add(qualifiedPath); pathList.add(qualifiedPath);

View File

@ -374,9 +374,9 @@ public class DistCp implements Tool {
FsPermission permission = preseved.contains(FileAttribute.PERMISSION)? FsPermission permission = preseved.contains(FileAttribute.PERMISSION)?
srcstat.getPermission(): null; srcstat.getPermission(): null;
short replication = preseved.contains(FileAttribute.REPLICATION)? short replication = preseved.contains(FileAttribute.REPLICATION)?
srcstat.getReplication(): destFileSys.getDefaultReplication(); srcstat.getReplication(): destFileSys.getDefaultReplication(f);
long blockSize = preseved.contains(FileAttribute.BLOCK_SIZE)? long blockSize = preseved.contains(FileAttribute.BLOCK_SIZE)?
srcstat.getBlockSize(): destFileSys.getDefaultBlockSize(); srcstat.getBlockSize(): destFileSys.getDefaultBlockSize(f);
return destFileSys.create(f, permission, true, sizeBuf, replication, return destFileSys.create(f, permission, true, sizeBuf, replication,
blockSize, reporter); blockSize, reporter);
} }