HDFS-5619. NameNode: record ACL modifications to edit log. Contributed by Haohui Mai.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4685@1553224 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Chris Nauroth 2013-12-23 22:12:45 +00:00
parent 25e9d2e895
commit c3b56ed1c8
11 changed files with 253 additions and 20 deletions

View File

@ -18,6 +18,9 @@ HDFS-4685 (Unreleased)
HDFS-5618. NameNode: persist ACLs in fsimage. (Haohui Mai via cnauroth)
HDFS-5619. NameNode: record ACL modifications to edit log.
(Haohui Mai via cnauroth)
OPTIMIZATIONS
BUG FIXES

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
@ -28,6 +29,8 @@ import org.apache.hadoop.fs.permission.AclEntry;
*/
@InterfaceAudience.Private
public class AclFeature implements INode.Feature {
public static final List<AclEntry> EMPTY_ENTRY_LIST = Collections.emptyList();
private List<AclEntry> entries;
public List<AclEntry> getEntries() {

View File

@ -77,7 +77,6 @@ import org.apache.hadoop.hdfs.util.ReadOnlyList;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/*************************************************
* FSDirectory stores the filesystem directory state.
@ -2640,28 +2639,50 @@ public class FSDirectory implements Closeable {
void removeAcl(String src) throws IOException {
writeLock();
try {
final INodeWithAdditionalFields node = resolveINodeWithAdditionalField(src);
AclFeature f = node.getAclFeature();
if (f != null)
node.removeAclFeature();
unprotectedRemoveAcl(src);
fsImage.getEditLog().logSetAcl(src, AclFeature.EMPTY_ENTRY_LIST);
} finally {
writeUnlock();
}
}
void setAcl(String src, Iterable<AclEntry> aclSpec) throws IOException {
writeLock();
try {
private void unprotectedRemoveAcl(String src) throws UnresolvedLinkException,
SnapshotAccessControlException, FileNotFoundException {
assert hasWriteLock();
final INodeWithAdditionalFields node = resolveINodeWithAdditionalField(src);
AclFeature f = node.getAclFeature();
if (f != null)
node.removeAclFeature();
}
void setAcl(String src, List<AclEntry> aclSpec) throws IOException {
writeLock();
try {
unprotectedSetAcl(src, aclSpec);
fsImage.getEditLog().logSetAcl(src, aclSpec);
} finally {
writeUnlock();
}
}
void unprotectedSetAcl(String src, List<AclEntry> aclSpec)
throws UnresolvedLinkException, SnapshotAccessControlException,
FileNotFoundException {
assert hasWriteLock();
final INodeWithAdditionalFields node = resolveINodeWithAdditionalField(src);
AclFeature f = node.getAclFeature();
if (aclSpec.size() == 0) {
if (f != null)
node.removeAclFeature();
return;
}
if (f == null) {
f = new AclFeature();
node.addAclFeature(f);
}
f.setEntries(Lists.newArrayList(aclSpec));
} finally {
writeUnlock();
}
f.setEntries(aclSpec);
}
AclStatus getAclStatus(String src) throws IOException {

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
@ -68,6 +69,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetAclOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp;
@ -1000,6 +1002,13 @@ public class FSEditLog implements LogsPurgeable {
logEdit(op);
}
void logSetAcl(String src, List<AclEntry> entries) {
SetAclOp op = SetAclOp.getInstance();
op.src = src;
op.aclEntries = entries;
logEdit(op);
}
/**
* Get all the journals this edit log is currently operating on.
*/

View File

@ -66,6 +66,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetAclOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetNSQuotaOp;
@ -691,6 +692,11 @@ public class FSEditLogLoader {
}
break;
}
case OP_SET_ACL: {
SetAclOp setAclOp = (SetAclOp) op;
fsDir.unprotectedSetAcl(setAclOp.src, setAclOp.aclEntries);
break;
}
default:
throw new IOException("Invalid operation read " + op.opCode);
}

View File

@ -43,6 +43,7 @@ import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_OLD;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_SNAPSHOT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENEW_DELEGATION_TOKEN;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_ACL;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_GENSTAMP_V1;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_GENSTAMP_V2;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_NS_QUOTA;
@ -75,6 +76,10 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclEntryScope;
import org.apache.hadoop.fs.permission.AclEntryType;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -86,6 +91,8 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEditLogProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.util.XMLUtils;
import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
@ -108,6 +115,8 @@ import org.xml.sax.helpers.AttributesImpl;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
/**
* Helper classes for reading the ops from an InputStream.
@ -166,6 +175,7 @@ public abstract class FSEditLogOp {
inst.put(OP_ADD_CACHE_POOL, new AddCachePoolOp());
inst.put(OP_MODIFY_CACHE_POOL, new ModifyCachePoolOp());
inst.put(OP_REMOVE_CACHE_POOL, new RemoveCachePoolOp());
inst.put(OP_SET_ACL, new SetAclOp());
}
public FSEditLogOp get(FSEditLogOpCodes opcode) {
@ -173,6 +183,16 @@ public abstract class FSEditLogOp {
}
}
private static ImmutableMap<String, FsAction> fsActionMap() {
ImmutableMap.Builder<String, FsAction> b = ImmutableMap.builder();
for (FsAction v : FsAction.values())
b.put(v.SYMBOL, v);
return b.build();
}
private static final ImmutableMap<String, FsAction> FSACTION_SYMBOL_MAP
= fsActionMap();
/**
* Constructor for an EditLog Op. EditLog ops cannot be constructed
* directly, but only through Reader#readOp.
@ -3232,6 +3252,65 @@ public abstract class FSEditLogOp {
}
}
static class SetAclOp extends FSEditLogOp {
List<AclEntry> aclEntries = Lists.newArrayList();
String src;
private SetAclOp() {
super(OP_SET_ACL);
}
static SetAclOp getInstance() {
return new SetAclOp();
}
@Override
void readFields(DataInputStream in, int logVersion) throws IOException {
AclEditLogProto p = AclEditLogProto.parseDelimitedFrom((DataInputStream)in);
src = p.getSrc();
aclEntries = PBHelper.convertAclEntry(p.getEntriesList());
}
@Override
public void writeFields(DataOutputStream out) throws IOException {
AclEditLogProto.Builder b = AclEditLogProto.newBuilder();
if (src != null)
b.setSrc(src);
b.addAllEntries(PBHelper.convertAclEntryProto(aclEntries));
b.build().writeDelimitedTo(out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "SRC", src);
for (AclEntry e : aclEntries) {
contentHandler.startElement("", "", "ENTRY", new AttributesImpl());
XMLUtils.addSaxString(contentHandler, "SCOPE", e.getScope().name());
XMLUtils.addSaxString(contentHandler, "TYPE", e.getType().name());
XMLUtils.addSaxString(contentHandler, "NAME", e.getName());
fsActionToXml(contentHandler, e.getPermission());
contentHandler.endElement("", "", "ENTRY");
}
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
src = st.getValue("SRC");
if (!st.hasChildren("ENTRY"))
return;
List<Stanza> stanzas = st.getChildren("ENTRY");
for (Stanza s : stanzas) {
AclEntry e = new AclEntry.Builder()
.setScope(AclEntryScope.valueOf(s.getValue("SCOPE")))
.setType(AclEntryType.valueOf(s.getValue("TYPE")))
.setName(s.getValue("NAME"))
.setPermission(fsActionFromXml(s)).build();
aclEntries.add(e);
}
}
}
static private short readShort(DataInputStream in) throws IOException {
return Short.parseShort(FSImageSerialization.readString(in));
}
@ -3641,4 +3720,16 @@ public abstract class FSEditLogOp {
short mode = Short.valueOf(st.getValue("MODE"));
return new FsPermission(mode);
}
private static void fsActionToXml(ContentHandler contentHandler, FsAction v)
throws SAXException {
XMLUtils.addSaxString(contentHandler, "PERM", v.SYMBOL);
}
private static FsAction fsActionFromXml(Stanza st) throws InvalidXmlException {
FsAction v = FSACTION_SYMBOL_MAP.get(st.getValue("PERM"));
if (v == null)
throw new InvalidXmlException("Invalid value for FsAction");
return v;
}
}

View File

@ -66,6 +66,7 @@ public enum FSEditLogOpCodes {
OP_MODIFY_CACHE_POOL ((byte) 36),
OP_REMOVE_CACHE_POOL ((byte) 37),
OP_MODIFY_CACHE_DIRECTIVE ((byte) 38),
OP_SET_ACL ((byte) 39),
// Note that fromByte(..) depends on OP_INVALID being at the last position.
OP_INVALID ((byte) -1);

View File

@ -7327,7 +7327,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
}
void setAcl(String src, Iterable<AclEntry> aclSpec) throws IOException {
void setAcl(String src, List<AclEntry> aclSpec) throws IOException {
checkOperation(OperationCategory.WRITE);
writeLock();
try {

View File

@ -64,6 +64,11 @@ message AclFsImageProto {
repeated AclEntryProto entries = 1;
}
message AclEditLogProto {
required string src = 1;
repeated AclEntryProto entries = 2;
}
message ModifyAclEntriesRequestProto {
required string src = 1;
repeated AclEntryProto aclSpec = 2;

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -27,7 +29,7 @@ import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
@ -1061,6 +1063,8 @@ public class DFSTestUtil {
filesystem.removeCacheDirective(id);
// OP_REMOVE_CACHE_POOL
filesystem.removeCachePool("pool1");
// OP_SET_ACL
filesystem.setAcl(pathConcatTarget, Lists.<AclEntry> newArrayList());
}
public static void abortStream(DFSOutputStream out) throws IOException {

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclEntryScope;
import org.apache.hadoop.fs.permission.AclEntryType;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.collect.Lists;
public class TestFSImageWithAcl {
private static Configuration conf;
private static MiniDFSCluster cluster;
@BeforeClass
public static void setUp() throws IOException {
conf = new Configuration();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
}
@AfterClass
public static void tearDown() {
cluster.shutdown();
}
private void testAcl(boolean persistNamespace) throws IOException {
Path p = new Path("/p");
DistributedFileSystem fs = cluster.getFileSystem();
fs.create(p).close();
fs.mkdirs(new Path("/23"));
AclEntry e = new AclEntry.Builder().setName("foo")
.setPermission(FsAction.READ_EXECUTE).setScope(AclEntryScope.DEFAULT)
.setType(AclEntryType.OTHER).build();
fs.setAcl(p, Lists.newArrayList(e));
if (persistNamespace) {
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
fs.saveNamespace();
fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
}
cluster.restartNameNode();
cluster.waitActive();
AclStatus s = cluster.getNamesystem().getAclStatus(p.toString());
AclEntry[] returned = Lists.newArrayList(s.getEntries()).toArray(
new AclEntry[0]);
Assert.assertArrayEquals(new AclEntry[] { e }, returned);
}
@Test
public void testPersistAcl() throws IOException {
testAcl(true);
}
@Test
public void testAclEditLog() throws IOException {
testAcl(false);
}
}