svn merge -c 1242389 from trunk to branch 0.23 fixes HADOOP-8042 When copying a file out of HDFS, modifying it, and uploading it back into HDFS, the put fails due to a CRC mismatch

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1242394 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-02-09 16:48:35 +00:00
parent 6ebceaeede
commit cdd7a1f5c4
9 changed files with 159 additions and 47 deletions

View File

@ -12,6 +12,10 @@ Release 0.23.2 - UNRELEASED
BUG FIXES BUG FIXES
HADOOP-8042 When copying a file out of HDFS, modifying it, and uploading
it back into HDFS, the put fails due to a CRC mismatch
(Daryn Sharp via bobby)
Release 0.23.1 - 2012-02-08 Release 0.23.1 - 2012-02-08
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -43,6 +43,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0}; private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0};
private int bytesPerChecksum = 512; private int bytesPerChecksum = 512;
private boolean verifyChecksum = true; private boolean verifyChecksum = true;
private boolean writeChecksum = true;
public static double getApproxChkSumLength(long size) { public static double getApproxChkSumLength(long size) {
return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size; return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size;
@ -67,6 +68,11 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
this.verifyChecksum = verifyChecksum; this.verifyChecksum = verifyChecksum;
} }
@Override
public void setWriteChecksum(boolean writeChecksum) {
this.writeChecksum = writeChecksum;
}
/** get the raw file system */ /** get the raw file system */
public FileSystem getRawFileSystem() { public FileSystem getRawFileSystem() {
return fs; return fs;
@ -428,9 +434,20 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
throw new IOException("Mkdirs failed to create " + parent); throw new IOException("Mkdirs failed to create " + parent);
} }
} }
final FSDataOutputStream out = new FSDataOutputStream( final FSDataOutputStream out;
new ChecksumFSOutputSummer(this, f, overwrite, bufferSize, replication, if (writeChecksum) {
blockSize, progress), null); out = new FSDataOutputStream(
new ChecksumFSOutputSummer(this, f, overwrite, bufferSize, replication,
blockSize, progress), null);
} else {
out = fs.create(f, permission, overwrite, bufferSize, replication,
blockSize, progress);
// remove the checksum file since we aren't writing one
Path checkFile = getChecksumFile(f);
if (fs.exists(checkFile)) {
fs.delete(checkFile, true);
}
}
if (permission != null) { if (permission != null) {
setPermission(f, permission); setPermission(f, permission);
} }

View File

@ -1936,6 +1936,15 @@ public abstract class FileSystem extends Configured implements Closeable {
//doesn't do anything //doesn't do anything
} }
/**
* Set the write checksum flag. This is only applicable if the
* corresponding FileSystem supports checksum. By default doesn't do anything.
* @param writeChecksum
*/
public void setWriteChecksum(boolean writeChecksum) {
//doesn't do anything
}
/** /**
* Return a list of file status objects that corresponds to the list of paths * Return a list of file status objects that corresponds to the list of paths
* excluding those non-existent paths. * excluding those non-existent paths.

View File

@ -360,6 +360,11 @@ public class FilterFileSystem extends FileSystem {
public void setVerifyChecksum(boolean verifyChecksum) { public void setVerifyChecksum(boolean verifyChecksum) {
fs.setVerifyChecksum(verifyChecksum); fs.setVerifyChecksum(verifyChecksum);
} }
@Override
public void setWriteChecksum(boolean writeChecksum) {
fs.setVerifyChecksum(writeChecksum);
}
@Override @Override
public Configuration getConf() { public Configuration getConf() {

View File

@ -41,7 +41,9 @@ import org.apache.hadoop.io.IOUtils;
*/ */
abstract class CommandWithDestination extends FsCommand { abstract class CommandWithDestination extends FsCommand {
protected PathData dst; protected PathData dst;
protected boolean overwrite = false; private boolean overwrite = false;
private boolean verifyChecksum = true;
private boolean writeChecksum = true;
/** /**
* *
@ -53,6 +55,14 @@ abstract class CommandWithDestination extends FsCommand {
overwrite = flag; overwrite = flag;
} }
protected void setVerifyChecksum(boolean flag) {
verifyChecksum = flag;
}
protected void setWriteChecksum(boolean flag) {
writeChecksum = flag;
}
/** /**
* The last arg is expected to be a local path, if only one argument is * The last arg is expected to be a local path, if only one argument is
* given then the destination will be the current directory * given then the destination will be the current directory
@ -201,6 +211,7 @@ abstract class CommandWithDestination extends FsCommand {
* @throws IOException if copy fails * @throws IOException if copy fails
*/ */
protected void copyFileToTarget(PathData src, PathData target) throws IOException { protected void copyFileToTarget(PathData src, PathData target) throws IOException {
src.fs.setVerifyChecksum(verifyChecksum);
copyStreamToTarget(src.fs.open(src.path), target); copyStreamToTarget(src.fs.open(src.path), target);
} }
@ -217,6 +228,7 @@ abstract class CommandWithDestination extends FsCommand {
if (target.exists && (target.stat.isDirectory() || !overwrite)) { if (target.exists && (target.stat.isDirectory() || !overwrite)) {
throw new PathExistsException(target.toString()); throw new PathExistsException(target.toString());
} }
target.fs.setWriteChecksum(writeChecksum);
PathData tempFile = null; PathData tempFile = null;
try { try {
tempFile = target.createTempFile(target+"._COPYING_"); tempFile = target.createTempFile(target+"._COPYING_");

View File

@ -25,7 +25,6 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ChecksumFileSystem;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
/** Various commands for copy files */ /** Various commands for copy files */
@ -103,43 +102,17 @@ class CopyCommands {
"to the local name. <src> is kept. When copying multiple,\n" + "to the local name. <src> is kept. When copying multiple,\n" +
"files, the destination must be a directory."; "files, the destination must be a directory.";
/**
* The prefix for the tmp file used in copyToLocal.
* It must be at least three characters long, required by
* {@link java.io.File#createTempFile(String, String, File)}.
*/
private boolean copyCrc;
private boolean verifyChecksum;
@Override @Override
protected void processOptions(LinkedList<String> args) protected void processOptions(LinkedList<String> args)
throws IOException { throws IOException {
CommandFormat cf = new CommandFormat( CommandFormat cf = new CommandFormat(
1, Integer.MAX_VALUE, "crc", "ignoreCrc"); 1, Integer.MAX_VALUE, "crc", "ignoreCrc");
cf.parse(args); cf.parse(args);
copyCrc = cf.getOpt("crc"); setWriteChecksum(cf.getOpt("crc"));
verifyChecksum = !cf.getOpt("ignoreCrc"); setVerifyChecksum(!cf.getOpt("ignoreCrc"));
setRecursive(true); setRecursive(true);
getLocalDestination(args); getLocalDestination(args);
} }
@Override
protected void copyFileToTarget(PathData src, PathData target)
throws IOException {
src.fs.setVerifyChecksum(verifyChecksum);
if (copyCrc && !(src.fs instanceof ChecksumFileSystem)) {
displayWarning(src.fs + ": Does not support checksums");
copyCrc = false;
}
super.copyFileToTarget(src, target);
if (copyCrc) {
// should we delete real file if crc copy fails?
super.copyFileToTarget(src.getChecksumFile(), target.getChecksumFile());
}
}
} }
/** /**

View File

@ -27,7 +27,6 @@ import java.net.URISyntaxException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumFileSystem;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.LocalFileSystem;
@ -169,19 +168,6 @@ public class PathData {
} }
} }
/**
* Return the corresponding crc data for a file. Avoids exposing the fs
* contortions to the caller.
* @return PathData of the crc file
* @throws IOException is anything goes wrong
*/
public PathData getChecksumFile() throws IOException {
checkIfExists(FileTypeRequirement.SHOULD_NOT_BE_DIRECTORY);
ChecksumFileSystem srcFs = (ChecksumFileSystem)fs;
Path srcPath = srcFs.getChecksumFile(path);
return new PathData(srcFs.getRawFileSystem(), srcPath.toString());
}
/** /**
* Returns a temporary file for this PathData with the given extension. * Returns a temporary file for this PathData with the given extension.
* The file will be deleted on exit. * The file will be deleted on exit.

View File

@ -470,6 +470,15 @@ public class ViewFileSystem extends FileSystem {
} }
} }
@Override
public void setWriteChecksum(final boolean writeChecksum) {
List<InodeTree.MountPoint<FileSystem>> mountPoints =
fsState.getMountPoints();
for (InodeTree.MountPoint<FileSystem> mount : mountPoints) {
mount.target.targetFileSystem.setWriteChecksum(writeChecksum);
}
}
public MountPoint[] getMountPoints() { public MountPoint[] getMountPoints() {
List<InodeTree.MountPoint<FileSystem>> mountPoints = List<InodeTree.MountPoint<FileSystem>> mountPoints =
fsState.getMountPoints(); fsState.getMountPoints();

View File

@ -0,0 +1,97 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import static org.junit.Assert.*;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestFsShellCopy {
static Configuration conf;
static FsShell shell;
static LocalFileSystem lfs;
static Path testRootDir, srcPath, dstPath;
@BeforeClass
public static void setup() throws Exception {
conf = new Configuration();
shell = new FsShell(conf);
lfs = FileSystem.getLocal(conf);
testRootDir = new Path(
System.getProperty("test.build.data","test/build/data"), "testShellCopy");
lfs.mkdirs(testRootDir);
srcPath = new Path(testRootDir, "srcFile");
dstPath = new Path(testRootDir, "dstFile");
}
@Before
public void prepFiles() throws Exception {
lfs.setVerifyChecksum(true);
lfs.setWriteChecksum(true);
lfs.delete(srcPath, true);
lfs.delete(dstPath, true);
FSDataOutputStream out = lfs.create(srcPath);
out.writeChars("hi");
out.close();
assertTrue(lfs.exists(lfs.getChecksumFile(srcPath)));
}
@Test
public void testCopyNoCrc() throws Exception {
shellRun(0, "-get", srcPath.toString(), dstPath.toString());
checkPath(dstPath, false);
}
@Test
public void testCopyCrc() throws Exception {
shellRun(0, "-get", "-crc", srcPath.toString(), dstPath.toString());
checkPath(dstPath, true);
}
@Test
public void testCorruptedCopyCrc() throws Exception {
FSDataOutputStream out = lfs.getRawFileSystem().create(srcPath);
out.writeChars("bang");
out.close();
shellRun(1, "-get", srcPath.toString(), dstPath.toString());
}
@Test
public void testCorruptedCopyIgnoreCrc() throws Exception {
shellRun(0, "-get", "-ignoreCrc", srcPath.toString(), dstPath.toString());
checkPath(dstPath, false);
}
private void checkPath(Path p, boolean expectChecksum) throws IOException {
assertTrue(lfs.exists(p));
boolean hasChecksum = lfs.exists(lfs.getChecksumFile(p));
assertEquals(expectChecksum, hasChecksum);
}
private void shellRun(int n, String ... args) throws Exception {
assertEquals(n, shell.run(args));
}
}