HADOOP-13052. ChecksumFileSystem mishandles crc file permissions. Contributed by Daryn Sharp.

(cherry picked from commit 9dbdc8e12d)
This commit is contained in:
Kihwal Lee 2016-04-22 15:03:56 -05:00
parent 45ff579bfa
commit d0718ed466
2 changed files with 134 additions and 15 deletions

View File

@ -24,11 +24,13 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
@ -155,11 +157,14 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
throw new IOException("Not a checksum file: "+sumFile); throw new IOException("Not a checksum file: "+sumFile);
this.bytesPerSum = sums.readInt(); this.bytesPerSum = sums.readInt();
set(fs.verifyChecksum, DataChecksum.newCrc32(), bytesPerSum, 4); set(fs.verifyChecksum, DataChecksum.newCrc32(), bytesPerSum, 4);
} catch (FileNotFoundException e) { // quietly ignore } catch (IOException e) {
set(fs.verifyChecksum, null, 1, 0); // mincing the message is terrible, but java throws permission
} catch (IOException e) { // loudly ignore // exceptions as FNF because that's all the method signatures allow!
LOG.warn("Problem opening checksum file: "+ file + if (!(e instanceof FileNotFoundException) ||
". Ignoring exception: " , e); e.getMessage().endsWith(" (Permission denied)")) {
LOG.warn("Problem opening checksum file: "+ file +
". Ignoring exception: " , e);
}
set(fs.verifyChecksum, null, 1, 0); set(fs.verifyChecksum, null, 1, 0);
} }
} }
@ -478,6 +483,103 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
blockSize, progress); blockSize, progress);
} }
abstract class FsOperation {
boolean run(Path p) throws IOException {
boolean status = apply(p);
if (status) {
Path checkFile = getChecksumFile(p);
if (fs.exists(checkFile)) {
apply(checkFile);
}
}
return status;
}
abstract boolean apply(Path p) throws IOException;
}
@Override
public void setPermission(Path src, final FsPermission permission)
throws IOException {
new FsOperation(){
@Override
boolean apply(Path p) throws IOException {
fs.setPermission(p, permission);
return true;
}
}.run(src);
}
@Override
public void setOwner(Path src, final String username, final String groupname)
throws IOException {
new FsOperation(){
@Override
boolean apply(Path p) throws IOException {
fs.setOwner(p, username, groupname);
return true;
}
}.run(src);
}
@Override
public void setAcl(Path src, final List<AclEntry> aclSpec)
throws IOException {
new FsOperation(){
@Override
boolean apply(Path p) throws IOException {
fs.setAcl(p, aclSpec);
return true;
}
}.run(src);
}
@Override
public void modifyAclEntries(Path src, final List<AclEntry> aclSpec)
throws IOException {
new FsOperation(){
@Override
boolean apply(Path p) throws IOException {
fs.modifyAclEntries(p, aclSpec);
return true;
}
}.run(src);
}
@Override
public void removeAcl(Path src) throws IOException {
new FsOperation(){
@Override
boolean apply(Path p) throws IOException {
fs.removeAcl(p);
return true;
}
}.run(src);
}
@Override
public void removeAclEntries(Path src, final List<AclEntry> aclSpec)
throws IOException {
new FsOperation(){
@Override
boolean apply(Path p) throws IOException {
fs.removeAclEntries(p, aclSpec);
return true;
}
}.run(src);
}
@Override
public void removeDefaultAcl(Path src) throws IOException {
new FsOperation(){
@Override
boolean apply(Path p) throws IOException {
fs.removeDefaultAcl(p);
return true;
}
}.run(src);
}
/** /**
* Set replication for an existing file. * Set replication for an existing file.
* Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt> * Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt>
@ -488,16 +590,14 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
* false if file does not exist or is a directory * false if file does not exist or is a directory
*/ */
@Override @Override
public boolean setReplication(Path src, short replication) throws IOException { public boolean setReplication(Path src, final short replication)
boolean value = fs.setReplication(src, replication); throws IOException {
if (!value) return new FsOperation(){
return false; @Override
boolean apply(Path p) throws IOException {
Path checkFile = getChecksumFile(src); return fs.setReplication(p, replication);
if (exists(checkFile)) }
fs.setReplication(checkFile, replication); }.run(src);
return true;
} }
/** /**

View File

@ -18,8 +18,11 @@
package org.apache.hadoop.fs; package org.apache.hadoop.fs;
import java.util.Arrays;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.permission.FsPermission;
import static org.apache.hadoop.fs.FileSystemTestHelper.*; import static org.apache.hadoop.fs.FileSystemTestHelper.*;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
@ -281,4 +284,20 @@ public class TestChecksumFileSystem {
assertTrue(localFs.rename(srcPath, dstPath)); assertTrue(localFs.rename(srcPath, dstPath));
assertTrue(localFs.exists(localFs.getChecksumFile(realDstPath))); assertTrue(localFs.exists(localFs.getChecksumFile(realDstPath)));
} }
@Test
public void testSetPermissionCrc() throws Exception {
FileSystem rawFs = localFs.getRawFileSystem();
Path p = new Path(TEST_ROOT_DIR, "testCrcPermissions");
localFs.createNewFile(p);
Path crc = localFs.getChecksumFile(p);
assert(rawFs.exists(crc));
for (short mode : Arrays.asList((short)0666, (short)0660, (short)0600)) {
FsPermission perm = new FsPermission(mode);
localFs.setPermission(p, perm);
assertEquals(perm, localFs.getFileStatus(p).getPermission());
assertEquals(perm, rawFs.getFileStatus(crc).getPermission());
}
}
} }