HDFS-7385. ThreadLocal used in FSEditLog class causes FSImage permission mess up. Contributed by jiangyu.

(cherry picked from commit b0a41de68c)
(cherry picked from commit 4e9001530f)
This commit is contained in:
cnauroth 2014-11-13 12:31:44 -08:00
parent 4e9b46da11
commit ed2a4db6d4
4 changed files with 84 additions and 0 deletions

View File

@ -1023,6 +1023,9 @@ Release 2.6.0 - 2014-11-15
HDFS-7391. Renable SSLv2Hello in HttpFS. (rkanter via acmurthy) HDFS-7391. Renable SSLv2Hello in HttpFS. (rkanter via acmurthy)
HDFS-7385. ThreadLocal used in FSEditLog class causes FSImage permission mess
up. (jiangyu via cnauroth)
Release 2.5.2 - 2014-11-10 Release 2.5.2 - 2014-11-10
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -708,6 +708,7 @@ public class FSEditLog implements LogsPurgeable {
Preconditions.checkArgument(newNode.isUnderConstruction()); Preconditions.checkArgument(newNode.isUnderConstruction());
PermissionStatus permissions = newNode.getPermissionStatus(); PermissionStatus permissions = newNode.getPermissionStatus();
AddOp op = AddOp.getInstance(cache.get()) AddOp op = AddOp.getInstance(cache.get())
.reset()
.setInodeId(newNode.getId()) .setInodeId(newNode.getId())
.setPath(path) .setPath(path)
.setReplication(newNode.getFileReplication()) .setReplication(newNode.getFileReplication())
@ -778,6 +779,7 @@ public class FSEditLog implements LogsPurgeable {
public void logMkDir(String path, INode newNode) { public void logMkDir(String path, INode newNode) {
PermissionStatus permissions = newNode.getPermissionStatus(); PermissionStatus permissions = newNode.getPermissionStatus();
MkdirOp op = MkdirOp.getInstance(cache.get()) MkdirOp op = MkdirOp.getInstance(cache.get())
.reset()
.setInodeId(newNode.getId()) .setInodeId(newNode.getId())
.setPath(path) .setPath(path)
.setTimestamp(newNode.getModificationTime()) .setTimestamp(newNode.getModificationTime())

View File

@ -421,6 +421,12 @@ public abstract class FSEditLogOp {
assert(opCode == OP_ADD || opCode == OP_CLOSE); assert(opCode == OP_ADD || opCode == OP_CLOSE);
} }
<T extends AddCloseOp> T reset() {
this.aclEntries = null;
this.xAttrs = null;
return (T)this;
}
<T extends AddCloseOp> T setInodeId(long inodeId) { <T extends AddCloseOp> T setInodeId(long inodeId) {
this.inodeId = inodeId; this.inodeId = inodeId;
return (T)this; return (T)this;
@ -1412,6 +1418,12 @@ public abstract class FSEditLogOp {
return (MkdirOp)cache.get(OP_MKDIR); return (MkdirOp)cache.get(OP_MKDIR);
} }
MkdirOp reset() {
this.aclEntries = null;
this.xAttrs = null;
return this;
}
MkdirOp setInodeId(long inodeId) { MkdirOp setInodeId(long inodeId) {
this.inodeId = inodeId; this.inodeId = inodeId;
return this; return this;

View File

@ -17,9 +17,14 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.fs.permission.AclEntryScope.*;
import static org.apache.hadoop.fs.permission.AclEntryType.*;
import static org.apache.hadoop.fs.permission.FsAction.*;
import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.*;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter; import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -57,6 +62,7 @@ import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -1499,4 +1505,65 @@ public class TestEditLog {
LOG.info(String.format("loaded %d edit log segments in %.2f seconds", LOG.info(String.format("loaded %d edit log segments in %.2f seconds",
NUM_EDIT_LOG_ROLLS, delta)); NUM_EDIT_LOG_ROLLS, delta));
} }
/**
* Edit log op instances are cached internally using thread-local storage.
* This test checks that the cached instances are reset in between different
* transactions processed on the same thread, so that we don't accidentally
* apply incorrect attributes to an inode.
*
* @throws IOException if there is an I/O error
*/
@Test
public void testResetThreadLocalCachedOps() throws IOException {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
// Set single handler thread, so all transactions hit same thread-local ops.
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, 1);
MiniDFSCluster cluster = null;
FileSystem fileSys = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fileSys = cluster.getFileSystem();
// Create /dir1 with a default ACL.
Path dir1 = new Path("/dir1");
fileSys.mkdirs(dir1);
List<AclEntry> aclSpec = Lists.newArrayList(
aclEntry(DEFAULT, USER, "foo", READ_EXECUTE));
fileSys.modifyAclEntries(dir1, aclSpec);
// /dir1/dir2 is expected to clone the default ACL.
Path dir2 = new Path("/dir1/dir2");
fileSys.mkdirs(dir2);
// /dir1/file1 is expected to clone the default ACL.
Path file1 = new Path("/dir1/file1");
fileSys.create(file1).close();
// /dir3 is not a child of /dir1, so must not clone the default ACL.
Path dir3 = new Path("/dir3");
fileSys.mkdirs(dir3);
// /file2 is not a child of /dir1, so must not clone the default ACL.
Path file2 = new Path("/file2");
fileSys.create(file2).close();
// Restart and assert the above stated expectations.
IOUtils.cleanup(LOG, fileSys);
cluster.restartNameNode();
fileSys = cluster.getFileSystem();
assertFalse(fileSys.getAclStatus(dir1).getEntries().isEmpty());
assertFalse(fileSys.getAclStatus(dir2).getEntries().isEmpty());
assertFalse(fileSys.getAclStatus(file1).getEntries().isEmpty());
assertTrue(fileSys.getAclStatus(dir3).getEntries().isEmpty());
assertTrue(fileSys.getAclStatus(file2).getEntries().isEmpty());
} finally {
IOUtils.cleanup(LOG, fileSys);
if (cluster != null) {
cluster.shutdown();
}
}
}
} }